You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/05/17 13:48:42 UTC

[iotdb] branch iotdb-1022-v2 updated (6862068 -> 7771a59)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-1022-v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from 6862068  fix aggregation tests
     new c1ff734  fix checkAggregation
     new 3125c46  fix IoTDBUDFManagementIT
     new 7771a59  fix udf tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iotdb/db/qp/logical/crud/SelectOperator.java   |  12 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |   6 +
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |  24 ++++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  14 +++
 .../db/qp/physical/crud/RawDataQueryPlan.java      |  67 +++++++++++
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |  70 ++++++++++--
 .../iotdb/db/qp/strategy/LogicalChecker.java       |  32 +++++-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 125 ++-------------------
 .../iotdb/db/query/expression/Expression.java      |   4 +
 .../iotdb/db/query/expression/ResultColumn.java    |   8 ++
 .../query/expression/binary/BinaryExpression.java  |   7 ++
 .../query/expression/unary/FunctionExpression.java |   8 ++
 .../db/query/expression/unary/MinusExpression.java |   6 +
 .../query/expression/unary/TimeSeriesOperand.java  |   6 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   9 +-
 .../iotdb/db/integration/IoTDBUDFManagementIT.java |  10 +-
 .../db/integration/IoTDBUDTFHybridQueryIT.java     |   3 +-
 17 files changed, 270 insertions(+), 141 deletions(-)

[iotdb] 02/03: fix IoTDBUDFManagementIT

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1022-v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3125c465c9a2e591a03e246a33f7a8f162d78acc
Author: SteveYurongSu <st...@outlook.com>
AuthorDate: Fri May 14 20:51:36 2021 +0800

    fix IoTDBUDFManagementIT
---
 .../org/apache/iotdb/db/integration/IoTDBUDFManagementIT.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFManagementIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFManagementIT.java
index f85e012..b970eaf 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFManagementIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDFManagementIT.java
@@ -223,7 +223,10 @@ public class IoTDBUDFManagementIT {
       statement.execute("create function aVg as \"org.apache.iotdb.db.query.udf.example.Adder\"");
       fail();
     } catch (SQLException throwable) {
-      assertTrue(throwable.getMessage().contains("expecting ID"));
+      assertTrue(
+          throwable
+              .getMessage()
+              .contains("the given function name conflicts with the built-in function name"));
     }
   }
 
@@ -237,7 +240,10 @@ public class IoTDBUDFManagementIT {
           "create function MAX_VALUE as \"org.apache.iotdb.db.query.udf.example.Adder\"");
       fail();
     } catch (SQLException throwable) {
-      assertTrue(throwable.getMessage().contains("expecting ID"));
+      assertTrue(
+          throwable
+              .getMessage()
+              .contains("the given function name conflicts with the built-in function name"));
     }
   }
 

[iotdb] 01/03: fix checkAggregation

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1022-v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1ff734ff57e9c31eaff813043b2e7d9e37904ac
Author: SteveYurongSu <st...@outlook.com>
AuthorDate: Fri May 14 19:54:02 2021 +0800

    fix checkAggregation
---
 .../org/apache/iotdb/db/qp/strategy/LogicalChecker.java  | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index b5a4202..cec031c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -24,6 +24,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectOperator;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 
 public class LogicalChecker {
 
@@ -51,10 +54,15 @@ public class LogicalChecker {
   private static void checkAggregation(QueryOperator queryOperator)
       throws LogicalOperatorException {
     SelectOperator selectOperator = queryOperator.getSelectOperator();
-    if (!selectOperator.getResultColumns().isEmpty()
-        && selectOperator.getPaths().size() != selectOperator.getAggregationFunctions().size()) {
-      throw new LogicalOperatorException(
-          "Common queries and aggregated queries are not allowed to appear at the same time");
+    if (!selectOperator.hasAggregationFunction()) {
+      return;
+    }
+    for (ResultColumn resultColumn : selectOperator.getResultColumns()) {
+      Expression expression = resultColumn.getExpression();
+      if (expression instanceof TimeSeriesOperand) {
+        throw new LogicalOperatorException(
+            "Common queries and aggregated queries are not allowed to appear at the same time");
+      }
     }
   }
 

[iotdb] 03/03: fix udf tests

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1022-v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7771a59d5319dc240c60e6672ce476b4f413640c
Author: SteveYurongSu <st...@outlook.com>
AuthorDate: Mon May 17 21:48:04 2021 +0800

    fix udf tests
---
 .../iotdb/db/qp/logical/crud/SelectOperator.java   |  12 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |   6 +
 .../iotdb/db/qp/physical/crud/LastQueryPlan.java   |  24 ++++
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  14 +++
 .../db/qp/physical/crud/RawDataQueryPlan.java      |  67 +++++++++++
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |  70 ++++++++++--
 .../iotdb/db/qp/strategy/LogicalChecker.java       |  16 +++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 125 ++-------------------
 .../iotdb/db/query/expression/Expression.java      |   4 +
 .../iotdb/db/query/expression/ResultColumn.java    |   8 ++
 .../query/expression/binary/BinaryExpression.java  |   7 ++
 .../query/expression/unary/FunctionExpression.java |   8 ++
 .../db/query/expression/unary/MinusExpression.java |   6 +
 .../query/expression/unary/TimeSeriesOperand.java  |   6 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   9 +-
 .../db/integration/IoTDBUDTFHybridQueryIT.java     |   3 +-
 16 files changed, 250 insertions(+), 135 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
index 12d51ab..14d2c24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
@@ -96,14 +96,10 @@ public final class SelectOperator extends Operator {
       pathsCache = new ArrayList<>();
       for (ResultColumn resultColumn : resultColumns) {
         Expression expression = resultColumn.getExpression();
-        if (expression instanceof TimeSeriesOperand) {
-          pathsCache.add(((TimeSeriesOperand) resultColumn.getExpression()).getPath());
-        } else {
-          TimeSeriesOperand timeSeriesOperand =
-              (TimeSeriesOperand)
-                  ((FunctionExpression) resultColumn.getExpression()).getExpressions().get(0);
-          pathsCache.add(timeSeriesOperand.getPath());
-        }
+        pathsCache.add(
+            expression instanceof TimeSeriesOperand
+                ? ((TimeSeriesOperand) expression).getPath()
+                : null);
       }
     }
     return pathsCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index be6ed98..3f79a19 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.crud;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
@@ -53,6 +54,11 @@ public class AlignByDevicePlan extends QueryPlan {
     super();
   }
 
+  @Override
+  public void deduplicate(PhysicalGenerator physicalGenerator) {
+    // do nothing
+  }
+
   public void setMeasurements(List<String> measurements) {
     this.measurements = measurements;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
index 56fdd1c..553feeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java
@@ -19,14 +19,22 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class LastQueryPlan extends RawDataQueryPlan {
 
   public LastQueryPlan() {
@@ -35,6 +43,22 @@ public class LastQueryPlan extends RawDataQueryPlan {
   }
 
   @Override
+  public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException {
+    Set<String> columnForReaderSet = new HashSet<>();
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = paths.get(i);
+      String column = getColumnForReaderFromPath(path, i);
+      if (!columnForReaderSet.contains(column)) {
+        TSDataType seriesType = dataTypes.get(i);
+        addDeduplicatedPaths(path);
+        addDeduplicatedDataTypes(seriesType);
+        columnForReaderSet.add(column);
+      }
+    }
+    transformPaths(IoTDB.metaManager);
+  }
+
+  @Override
   public void setExpression(IExpression expression) throws QueryProcessException {
     if (isValidExpression(expression)) {
       super.setExpression(expression);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 2de90a5..e2db5a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -19,10 +19,13 @@
 package org.apache.iotdb.db.qp.physical.crud;
 
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.HashMap;
@@ -31,6 +34,7 @@ import java.util.Map;
 
 public abstract class QueryPlan extends PhysicalPlan {
 
+  protected List<ResultColumn> resultColumns = null;
   protected List<PartialPath> paths = null;
   protected List<TSDataType> dataTypes = null;
   private boolean alignByTime = true; // for disable align sql
@@ -55,6 +59,8 @@ public abstract class QueryPlan extends PhysicalPlan {
     super(isQuery, operatorType);
   }
 
+  public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException;
+
   @Override
   public List<PartialPath> getPaths() {
     return paths;
@@ -150,4 +156,12 @@ public abstract class QueryPlan extends PhysicalPlan {
   public void setVectorPathToIndex(Map<String, Integer> vectorPathToIndex) {
     this.vectorPathToIndex = vectorPathToIndex;
   }
+
+  public List<ResultColumn> getResultColumns() {
+    return resultColumns;
+  }
+
+  public void setResultColumns(List<ResultColumn> resultColumns) {
+    this.resultColumns = resultColumns;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 6dc4d60..74b6b25 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.VectorPartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -54,6 +58,49 @@ public class RawDataQueryPlan extends QueryPlan {
     super(isQuery, operatorType);
   }
 
+  @Override
+  public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException {
+    // sort paths by device, to accelerate the metadata read process
+    List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      indexedPaths.add(new Pair<>(paths.get(i), i));
+    }
+    indexedPaths.sort(Comparator.comparing(pair -> pair.left));
+
+    Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
+    Set<String> columnForReaderSet = new HashSet<>();
+    Set<String> columnForDisplaySet = new HashSet<>();
+
+    for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
+      PartialPath originalPath = indexedPath.left;
+      Integer originalIndex = indexedPath.right;
+
+      String columnForReader = getColumnForReaderFromPath(originalPath, originalIndex);
+      if (!columnForReaderSet.contains(columnForReader)) {
+        addDeduplicatedPaths(originalPath);
+        addDeduplicatedDataTypes(dataTypes.get(originalIndex));
+        pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size());
+        if (this instanceof AggregationPlan) {
+          ((AggregationPlan) this)
+              .addDeduplicatedAggregations(getAggregations().get(originalIndex));
+        }
+        columnForReaderSet.add(columnForReader);
+      }
+
+      String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
+      if (!columnForDisplaySet.contains(columnForDisplay)) {
+        addPathToIndex(columnForDisplay, getPathToIndex().size());
+        columnForDisplaySet.add(columnForDisplay);
+      }
+    }
+
+    if (!isRawQuery()) {
+      transformPaths(IoTDB.metaManager);
+    } else {
+      transformVectorPaths(physicalGenerator, columnForDisplaySet);
+    }
+  }
+
   public IExpression getExpression() {
     return expression;
   }
@@ -132,6 +179,26 @@ public class RawDataQueryPlan extends QueryPlan {
     }
   }
 
+  public void transformVectorPaths(
+      PhysicalGenerator physicalGenerator, Set<String> columnForDisplaySet)
+      throws MetadataException {
+    Pair<List<PartialPath>, Map<String, Integer>> pair =
+        physicalGenerator.getSeriesSchema(getDeduplicatedPaths());
+
+    List<PartialPath> vectorizedDeduplicatedPaths = pair.left;
+    List<TSDataType> vectorizedDeduplicatedDataTypes =
+        new ArrayList<>(physicalGenerator.getSeriesTypes(vectorizedDeduplicatedPaths));
+    setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths);
+    setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes);
+
+    Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right;
+    Map<String, Integer> pathToIndex = new HashMap<>();
+    for (String columnForDisplay : columnForDisplaySet) {
+      pathToIndex.put(columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay));
+    }
+    setVectorPathToIndex(pathToIndex);
+  }
+
   public List<PartialPath> getDeduplicatedVectorPaths() {
     return deduplicatedVectorPaths;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
index 6eecb73..390bded 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java
@@ -19,20 +19,30 @@
 
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
+import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
@@ -53,15 +63,62 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
   }
 
   @Override
+  public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException {
+    // sort paths by device, to accelerate the metadata read process
+    List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>();
+    for (int i = 0; i < resultColumns.size(); i++) {
+      for (PartialPath path : resultColumns.get(i).collectPaths()) {
+        indexedPaths.add(new Pair<>(path, i));
+      }
+    }
+    indexedPaths.sort(Comparator.comparing(pair -> pair.left));
+
+    Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
+    Set<String> columnForReaderSet = new HashSet<>();
+    Set<String> columnForDisplaySet = new HashSet<>();
+
+    for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
+      PartialPath originalPath = indexedPath.left;
+      Integer originalIndex = indexedPath.right;
+
+      boolean isUdf =
+          !(resultColumns.get(originalIndex).getExpression() instanceof TimeSeriesOperand);
+
+      String columnForReader = getColumnForReaderFromPath(originalPath, originalIndex);
+      if (!columnForReaderSet.contains(columnForReader)) {
+        addDeduplicatedPaths(originalPath);
+        addDeduplicatedDataTypes(
+            isUdf ? IoTDB.metaManager.getSeriesType(originalPath) : dataTypes.get(originalIndex));
+        pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size());
+        columnForReaderSet.add(columnForReader);
+      }
+
+      String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex);
+      if (!columnForDisplaySet.contains(columnForDisplay)) {
+        addPathToIndex(columnForDisplay, getPathToIndex().size());
+        if (isUdf) {
+          addUdfOutputColumn(columnForDisplay);
+        } else {
+          addRawQueryOutputColumn(columnForDisplay);
+        }
+        columnForDisplaySet.add(columnForDisplay);
+      }
+    }
+
+    setPathNameToReaderIndex(pathNameToReaderIndex);
+  }
+
+  @Override
   public void constructUdfExecutors(List<ResultColumn> resultColumns) {
     for (int i = 0; i < resultColumns.size(); ++i) {
-      FunctionExpression expression = (FunctionExpression) resultColumns.get(i).getExpression();
-      if (expression == null) {
+      Expression expression = resultColumns.get(i).getExpression();
+      if (!(expression instanceof FunctionExpression)) {
         continue;
       }
 
       String columnName = expression.toString();
-      columnName2Executor.computeIfAbsent(columnName, k -> new UDTFExecutor(expression, zoneId));
+      columnName2Executor.computeIfAbsent(
+          columnName, k -> new UDTFExecutor((FunctionExpression) expression, zoneId));
       originalOutputColumnIndex2Executor.put(i, columnName2Executor.get(columnName));
     }
   }
@@ -131,10 +188,9 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan {
 
   @Override
   public String getColumnForDisplay(String columnForReader, int pathIndex) {
-    if (paths.get(pathIndex) == null) {
-      return this.getExecutorByOriginalOutputColumnIndex(pathIndex).getExpression().toString();
-    }
-    return columnForReader;
+    return !(resultColumns.get(pathIndex).getExpression() instanceof TimeSeriesOperand)
+        ? getExecutorByOriginalOutputColumnIndex(pathIndex).getExpression().toString()
+        : columnForReader;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index cec031c..f55fe7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -47,16 +47,32 @@ public class LogicalChecker {
 
   private static void checkSelectOperator(QueryOperator queryOperator)
       throws LogicalOperatorException {
+    checkLast(queryOperator);
     checkAggregation(queryOperator);
     checkAlignByDevice(queryOperator);
   }
 
+  private static void checkLast(QueryOperator queryOperator) throws LogicalOperatorException {
+    SelectOperator selectOperator = queryOperator.getSelectOperator();
+    if (!selectOperator.isLastQuery()) {
+      return;
+    }
+
+    for (ResultColumn resultColumn : selectOperator.getResultColumns()) {
+      Expression expression = resultColumn.getExpression();
+      if (!(expression instanceof TimeSeriesOperand)) {
+        throw new LogicalOperatorException("Last queries can only be applied on raw time series.");
+      }
+    }
+  }
+
   private static void checkAggregation(QueryOperator queryOperator)
       throws LogicalOperatorException {
     SelectOperator selectOperator = queryOperator.getSelectOperator();
     if (!selectOperator.hasAggregationFunction()) {
       return;
     }
+
     for (ResultColumn resultColumn : selectOperator.getResultColumns()) {
       Expression expression = resultColumn.getExpression();
       if (expression instanceof TimeSeriesOperand) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 71dc3da..9ea5d3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -122,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan;
 import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
 import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
-import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -131,7 +130,6 @@ import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -451,7 +449,7 @@ public class PhysicalGenerator {
     }
   }
 
-  protected List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException {
+  public List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException {
     return SchemaUtils.getSeriesTypesByPaths(paths);
   }
 
@@ -592,8 +590,15 @@ public class PhysicalGenerator {
       }
       return queryPlan;
     }
+
+    queryPlan.setResultColumns(queryOperator.getSelectOperator().getResultColumns());
+
     try {
-      deduplicate(queryPlan);
+      List<PartialPath> paths = queryPlan.getPaths();
+      List<TSDataType> dataTypes = getSeriesTypes(paths);
+      queryPlan.setDataTypes(dataTypes);
+
+      queryPlan.deduplicate(this);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
@@ -847,117 +852,7 @@ public class PhysicalGenerator {
     basicOperator.setSinglePath(concatPath);
   }
 
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private void deduplicate(QueryPlan queryPlan) throws MetadataException {
-    // generate dataType first
-    List<PartialPath> paths = queryPlan.getPaths();
-    List<TSDataType> dataTypes = getSeriesTypes(paths);
-    queryPlan.setDataTypes(dataTypes);
-
-    // deduplicate from here
-    if (queryPlan instanceof AlignByDevicePlan) {
-      return;
-    }
-
-    RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan;
-    Set<String> columnForReaderSet = new HashSet<>();
-    // if it's a last query, no need to sort by device
-    if (queryPlan instanceof LastQueryPlan) {
-      for (int i = 0; i < paths.size(); i++) {
-        PartialPath path = paths.get(i);
-        String column = queryPlan.getColumnForReaderFromPath(path, i);
-        if (!columnForReaderSet.contains(column)) {
-          TSDataType seriesType = dataTypes.get(i);
-          rawDataQueryPlan.addDeduplicatedPaths(path);
-          rawDataQueryPlan.addDeduplicatedDataTypes(seriesType);
-          columnForReaderSet.add(column);
-        }
-      }
-      ((LastQueryPlan) queryPlan).transformPaths(IoTDB.metaManager);
-      return;
-    }
-
-    // sort path by device
-    List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>();
-    for (int i = 0; i < paths.size(); i++) {
-      PartialPath path = paths.get(i);
-      if (path != null) { // non-udf
-        indexedPaths.add(new Pair<>(paths.get(i), i));
-      } else { // udf
-        FunctionExpression functionExpression =
-            (FunctionExpression)
-                ((UDTFPlan) queryPlan).getExecutorByOriginalOutputColumnIndex(i).getExpression();
-        for (PartialPath udfPath : functionExpression.getPaths()) {
-          indexedPaths.add(new Pair<>(udfPath, i));
-        }
-      }
-    }
-    indexedPaths.sort(Comparator.comparing(pair -> pair.left));
-
-    Map<String, Integer> pathNameToReaderIndex = new HashMap<>();
-    Set<String> columnForDisplaySet = new HashSet<>();
-    for (Pair<PartialPath, Integer> indexedPath : indexedPaths) {
-      PartialPath originalPath = indexedPath.left;
-      Integer originalIndex = indexedPath.right;
-
-      String columnForReader = queryPlan.getColumnForReaderFromPath(originalPath, originalIndex);
-      boolean isUdf = queryPlan instanceof UDTFPlan && paths.get(originalIndex) == null;
-
-      if (!columnForReaderSet.contains(columnForReader)) {
-        rawDataQueryPlan.addDeduplicatedPaths(originalPath);
-        rawDataQueryPlan.addDeduplicatedDataTypes(
-            isUdf ? IoTDB.metaManager.getSeriesType(originalPath) : dataTypes.get(originalIndex));
-        pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size());
-        if (queryPlan instanceof AggregationPlan) {
-          ((AggregationPlan) queryPlan)
-              .addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex));
-        }
-        columnForReaderSet.add(columnForReader);
-      }
-
-      String columnForDisplay = queryPlan.getColumnForDisplay(columnForReader, originalIndex);
-
-      if (!columnForDisplaySet.contains(columnForDisplay)) {
-        queryPlan.addPathToIndex(columnForDisplay, queryPlan.getPathToIndex().size());
-        if (queryPlan instanceof UDTFPlan) {
-          if (isUdf) {
-            ((UDTFPlan) queryPlan).addUdfOutputColumn(columnForDisplay);
-          } else {
-            ((UDTFPlan) queryPlan).addRawQueryOutputColumn(columnForDisplay);
-          }
-        }
-        columnForDisplaySet.add(columnForDisplay);
-      }
-    }
-    if (queryPlan instanceof UDTFPlan) {
-      ((UDTFPlan) queryPlan).setPathNameToReaderIndex(pathNameToReaderIndex);
-      return;
-    }
-
-    if (!rawDataQueryPlan.isRawQuery()) {
-      rawDataQueryPlan.transformPaths(IoTDB.metaManager);
-    } else {
-      // support vector
-      List<PartialPath> deduplicatedPaths = rawDataQueryPlan.getDeduplicatedPaths();
-      Pair<List<PartialPath>, Map<String, Integer>> pair = getSeriesSchema(deduplicatedPaths);
-
-      List<PartialPath> vectorizedDeduplicatedPaths = pair.left;
-      List<TSDataType> vectorizedDeduplicatedDataTypes =
-          new ArrayList<>(getSeriesTypes(vectorizedDeduplicatedPaths));
-      rawDataQueryPlan.setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths);
-      rawDataQueryPlan.setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes);
-
-      Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right;
-      Map<String, Integer> pathToIndex = new HashMap<>();
-      for (String columnForDisplay : columnForDisplaySet) {
-        pathToIndex.put(
-            columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay));
-      }
-      queryPlan.setVectorPathToIndex(pathToIndex);
-    }
-  }
-
-  protected Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
+  public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths)
       throws MetadataException {
     return IoTDB.metaManager.getSeriesSchemas(paths);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 94f888e..3429787 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
+import java.util.Set;
 
 public abstract class Expression {
 
@@ -40,6 +41,7 @@ public abstract class Expression {
     return isTimeSeriesGeneratingFunctionExpression;
   }
 
+  // TODO: implement this method
   public abstract TSDataType dataType() throws MetadataException;
 
   public abstract void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions);
@@ -47,4 +49,6 @@ public abstract class Expression {
   public abstract void removeWildcards(
       WildcardsRemover wildcardsRemover, List<Expression> resultExpressions)
       throws LogicalOptimizeException;
+
+  public abstract void collectPaths(Set<PartialPath> pathSet);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
index dd4a149..93c2d27 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class ResultColumn {
 
@@ -58,6 +60,12 @@ public class ResultColumn {
     }
   }
 
+  public Set<PartialPath> collectPaths() {
+    Set<PartialPath> pathSet = new HashSet<>();
+    expression.collectPaths(pathSet);
+    return pathSet;
+  }
+
   public Expression getExpression() {
     return expression;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 6f83c60..3091007 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public abstract class BinaryExpression extends Expression {
 
@@ -102,6 +103,12 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
+  public void collectPaths(Set<PartialPath> pathSet) {
+    leftExpression.collectPaths(pathSet);
+    rightExpression.collectPaths(pathSet);
+  }
+
+  @Override
   public final String toString() {
     return String.format(
         "%s %s %s", leftExpression.toString(), operator(), rightExpression.toString());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index de337a0..b05a6ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -35,6 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 public class FunctionExpression extends Expression {
 
@@ -129,6 +130,13 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public void collectPaths(Set<PartialPath> pathSet) {
+    for (Expression expression : expressions) {
+      expression.collectPaths(pathSet);
+    }
+  }
+
   public List<TSDataType> getDataTypes() throws MetadataException {
     if (dataTypes == null) {
       dataTypes = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
index 9ad761d..00518fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 public class MinusExpression extends Expression {
 
@@ -66,6 +67,11 @@ public class MinusExpression extends Expression {
   }
 
   @Override
+  public void collectPaths(Set<PartialPath> pathSet) {
+    expression.collectPaths(pathSet);
+  }
+
+  @Override
   public String toString() {
     return "-" + expression.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index d2ba984..3886abc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.List;
+import java.util.Set;
 
 public class TimeSeriesOperand extends Expression {
 
@@ -74,6 +75,11 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
+  public void collectPaths(Set<PartialPath> pathSet) {
+    pathSet.add(path);
+  }
+
+  @Override
   public String toString() {
     return path.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0637aa2..2e7b8c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -82,6 +82,8 @@ import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
+import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -992,8 +994,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
   private void getWideQueryHeaders(
       QueryPlan plan, List<String> respColumns, List<String> columnTypes)
       throws TException, MetadataException {
-    // Restore column header of aggregate to func(column_name), only
-    // support single aggregate function for now
+    List<ResultColumn> resultColumns = plan.getResultColumns();
     List<PartialPath> paths = plan.getPaths();
     List<TSDataType> seriesTypes = new ArrayList<>();
     switch (plan.getOperatorType()) {
@@ -1040,11 +1041,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         UDTFPlan udtfPlan = (UDTFPlan) plan;
         for (int i = 0; i < paths.size(); i++) {
           respColumns.add(
-              paths.get(i) != null
+              !(resultColumns.get(i).getExpression() instanceof FunctionExpression)
                   ? paths.get(i).getFullPath()
                   : udtfPlan.getExecutorByOriginalOutputColumnIndex(i).getExpression().toString());
           seriesTypes.add(
-              paths.get(i) != null
+              !(resultColumns.get(i).getExpression() instanceof FunctionExpression)
                   ? udtfPlan.getDataTypes().get(i)
                   : udtfPlan
                       .getExecutorByOriginalOutputColumnIndex(i)
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java
index 4ab33dd..0710b24 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java
@@ -155,7 +155,8 @@ public class IoTDBUDTFHybridQueryIT {
       statement.executeQuery(sql);
       fail();
     } catch (SQLException throwable) {
-      assertTrue(throwable.getMessage().contains("parsing SQL to physical plan"));
+      assertTrue(
+          throwable.getMessage().contains("Last queries can only be applied on raw time series."));
     }
   }