You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/07/14 01:29:01 UTC

[iotdb] 01/01: Optimize align by device logic

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c2e555d3c0e7fb7a3e40edea6d415cab22590e38
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Jul 14 09:28:24 2021 +0800

    Optimize align by device logic
---
 .../apache/iotdb/cluster/metadata/CMManager.java   |  36 -----
 .../cluster/query/ClusterPhysicalGenerator.java    |  13 +-
 .../cluster/query/reader/ClusterTimeGenerator.java |  14 +-
 .../apache/iotdb/cluster/server/ClientServer.java  |  14 --
 .../iotdb/db/qp/logical/crud/QueryOperator.java    | 175 +++++++++------------
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  57 ++-----
 .../iotdb/db/qp/physical/crud/MeasurementInfo.java |  75 +++++++++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  21 ---
 .../db/query/dataset/AlignByDeviceDataSet.java     |  20 ++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  14 +-
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |   3 +-
 .../org/apache/iotdb/db/integration/IoTDBAsIT.java |   2 +-
 .../db/integration/IoTDBQueryMemoryControlIT.java  |  20 +--
 13 files changed, 196 insertions(+), 268 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index f01870c..73d27d2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -943,42 +943,6 @@ public class CMManager extends MManager {
   }
 
   /**
-   * Get the data types of "paths". If "aggregation" is not null, every path will use the
-   * aggregation. First get types locally and if some paths does not exists, pull them from other
-   * nodes.
-   *
-   * @return the left one of the pair is the column types (considering aggregation), the right one
-   *     of the pair is the measurement types (not considering aggregation)
-   */
-  public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPaths(
-      List<PartialPath> pathStrs, String aggregation) throws MetadataException {
-    try {
-      return getSeriesTypesByPathsLocally(pathStrs, aggregation);
-    } catch (PathNotExistException e) {
-      // pull schemas remotely and cache them
-      metaPuller.pullTimeSeriesSchemas(pathStrs, null);
-      return getSeriesTypesByPathsLocally(pathStrs, aggregation);
-    }
-  }
-
-  private Pair<List<TSDataType>, List<TSDataType>> getSeriesTypesByPathsLocally(
-      List<PartialPath> pathStrs, String aggregation) throws MetadataException {
-    List<TSDataType> measurementDataTypes =
-        SchemaUtils.getSeriesTypesByPaths(pathStrs, (String) null);
-    // if the aggregation function is null, the type of column in result set
-    // is equal to the real type of the measurement
-    if (aggregation == null) {
-      return new Pair<>(measurementDataTypes, measurementDataTypes);
-    } else {
-      // if the aggregation function is not null,
-      // we should recalculate the type of column in result set
-      List<TSDataType> columnDataTypes =
-          SchemaUtils.getAggregatedDataTypes(measurementDataTypes, aggregation);
-      return new Pair<>(columnDataTypes, measurementDataTypes);
-    }
-  }
-
-  /**
    * Get the data types of "paths". If "aggregations" is not null, each one of it correspond to one
    * in "paths". First get types locally and if some paths does not exists, pull them from other
    * nodes.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
index 3141d18..c237ca3a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPhysicalGenerator.java
@@ -44,6 +44,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -57,14 +58,12 @@ public class ClusterPhysicalGenerator extends PhysicalGenerator {
   }
 
   @Override
-  public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(
-      List<PartialPath> paths, String aggregation) throws MetadataException {
-    return getCMManager().getSeriesTypesByPaths(paths, aggregation);
-  }
-
-  @Override
   public List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException {
-    return getCMManager().getSeriesTypesByPaths(paths, null).left;
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (PartialPath path : paths) {
+      dataTypes.add(path == null ? null : IoTDB.metaManager.getSeriesType(path));
+    }
+    return dataTypes;
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
index e0ea6ab..d1b38e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterTimeGenerator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.cluster.query.reader;
 
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
-import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
@@ -48,7 +47,6 @@ import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 
 public class ClusterTimeGenerator extends ServerTimeGenerator {
@@ -113,11 +111,7 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
     TSDataType dataType;
     ManagedSeriesReader mergeReader;
     try {
-      dataType =
-          ((CMManager) IoTDB.metaManager)
-              .getSeriesTypesByPaths(Collections.singletonList(path), null)
-              .left
-              .get(0);
+      dataType = IoTDB.metaManager.getSeriesType(path);
       mergeReader =
           readerFactory.getSeriesReader(
               path,
@@ -179,11 +173,7 @@ public class ClusterTimeGenerator extends ServerTimeGenerator {
     PartialPath path = (PartialPath) expression.getSeriesPath();
     TSDataType dataType;
     try {
-      dataType =
-          ((CMManager) IoTDB.metaManager)
-              .getSeriesTypesByPaths(Collections.singletonList(path), null)
-              .left
-              .get(0);
+      dataType = IoTDB.metaManager.getSeriesType(path);
 
       List<PartitionGroup> partitionGroups = metaGroupMember.routeFilter(null, path);
       for (PartitionGroup partitionGroup : partitionGroups) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 1200c2d..2dda983 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -262,20 +262,6 @@ public class ClientServer extends TSServiceImpl {
   }
 
   /**
-   * Get the data types of each path in “paths”. If "aggregation" is not null, all "paths" will use
-   * this aggregation.
-   *
-   * @param paths full timeseries paths
-   * @param aggregation if not null, it means "paths" all use this aggregation
-   * @return the data types of "paths" (using the aggregation)
-   * @throws MetadataException
-   */
-  protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, String aggregation)
-      throws MetadataException {
-    return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, aggregation).left;
-  }
-
-  /**
    * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext
    * is a RemoteQueryContext.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index f2668d3..1de305c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
+import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
@@ -40,7 +41,6 @@ import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.iotdb.db.utils.SchemaUtils.getAggregationType;
+
 public class QueryOperator extends Operator {
 
   protected SelectComponent selectComponent;
@@ -221,130 +223,89 @@ public class QueryOperator extends Operator {
     // remove stars in fromPaths and get deviceId with deduplication
     List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
     List<ResultColumn> resultColumns = selectComponent.getResultColumns();
-    List<String> originAggregations = selectComponent.getAggregationFunctions();
+    List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
 
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
-    Map<String, String> measurementAliasMap = new HashMap<>();
-    // to check the same measurement of different devices having the same datatype
-    // record the data type of each column of result set
-    Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
-    Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
-
-    // to record the real type of the corresponding measurement
-    Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+    Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
     List<PartialPath> paths = new ArrayList<>();
 
     for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
       ResultColumn resultColumn = resultColumns.get(i);
       Expression suffixExpression = resultColumn.getExpression();
-      PartialPath suffixPath =
-          suffixExpression instanceof TimeSeriesOperand
-              ? ((TimeSeriesOperand) suffixExpression).getPath()
-              : (((FunctionExpression) suffixExpression).getPaths().get(0));
-
-      // to record measurements in the loop of a suffix path
-      Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+      PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+      String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
 
       // if const measurement
       if (suffixPath.getMeasurement().startsWith("'")) {
-        measurements.add(suffixPath.getMeasurement());
-        measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+        String measurementName = suffixPath.getMeasurement();
+        measurements.add(measurementName);
+        measurementInfoMap.put(measurementName, new MeasurementInfo(MeasurementType.Constant));
         continue;
       }
 
-      for (PartialPath device : devices) { // per device in FROM after deduplication
+      // to record measurements in the loop of a suffix path
+      Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+      for (PartialPath device : devices) {
         PartialPath fullPath = device.concatPath(suffixPath);
         try {
           // remove stars in SELECT to get actual paths
           List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
-          if (resultColumn.hasAlias()) {
-            if (actualPaths.size() == 1) {
-              String columnName = actualPaths.get(0).getMeasurement();
-              if (originAggregations != null && !originAggregations.isEmpty()) {
-                measurementAliasMap.put(
-                    originAggregations.get(i) + "(" + columnName + ")", resultColumn.getAlias());
-              } else {
-                measurementAliasMap.put(columnName, resultColumn.getAlias());
-              }
-            } else if (actualPaths.size() >= 2) {
-              throw new QueryProcessException(
-                  "alias '"
-                      + resultColumn.getAlias()
-                      + "' can only be matched with one time series");
-            }
+          if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
+            throw new QueryProcessException(
+                String.format(
+                    "alias %s can only be matched with one time series", resultColumn.getAlias()));
           }
 
-          // for actual non exist path
-          if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
-            String nonExistMeasurement = fullPath.getMeasurement();
-            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
-                && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
-              measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+          if (actualPaths.isEmpty()) {
+            String nonExistMeasurement = getMeasurementName(fullPath.getMeasurement(), aggregation);
+            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
+              measurementInfoMap.putIfAbsent(
+                  nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
             }
-          }
+          } else {
+            for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+              PartialPath path = actualPaths.get(pathIdx);
+              String measurementName = getMeasurementName(path.getMeasurement(), aggregation);
+              // Data type with aggregation function `columnDataTypes` is used for:
+              //  1. Data type consistency check 2. Header calculation, output result set
+              // The actual data type of the time series `measurementDataTypes` is used for
+              //  the actual query in the AlignByDeviceDataSet
+              TSDataType measurementDataType = IoTDB.metaManager.getSeriesType(path);
+              TSDataType columnDataType =
+                  aggregation == null ? measurementDataType : getAggregationType(aggregation);
+              MeasurementInfo measurementInfo =
+                  measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
+
+              if (resultColumn.hasAlias()) {
+                measurementInfo.setMeasurementAlias(resultColumn.getAlias());
+              }
 
-          // Get data types with and without aggregate functions (actual time series) respectively
-          // Data type with aggregation function `columnDataTypes` is used for:
-          //  1. Data type consistency check 2. Header calculation, output result set
-          // The actual data type of the time series `measurementDataTypes` is used for
-          //  the actual query in the AlignByDeviceDataSet
-          String aggregation =
-              originAggregations != null && !originAggregations.isEmpty()
-                  ? originAggregations.get(i)
-                  : null;
-
-          Pair<List<TSDataType>, List<TSDataType>> pair =
-              generator.getSeriesTypes(actualPaths, aggregation);
-          List<TSDataType> columnDataTypes = pair.left;
-          List<TSDataType> measurementDataTypes = pair.right;
-          for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
-            PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
-
-            // check datatype consistency
-            // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
-            // device,
-            // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
-            String measurementChecked;
-            if (originAggregations != null && !originAggregations.isEmpty()) {
-              measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
-            } else {
-              measurementChecked = path.getMeasurement();
-            }
-            TSDataType columnDataType = columnDataTypes.get(pathIdx);
-            if (columnDataTypeMap.containsKey(measurementChecked)) {
-              if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
-                throw new QueryProcessException(
-                    "The data types of the same measurement column should be the same across "
-                        + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
-                        + "SQL document.");
+              // check datatype consistency
+              // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+              // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+              if (measurementInfo.getColumnDataType() != null) {
+                if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
+                  throw new QueryProcessException(
+                      "The data types of the same measurement column should be the same across devices.");
+                }
+              } else {
+                measurementInfo.setColumnDataType(columnDataType);
+                measurementInfo.setMeasurementDataType(measurementDataType);
               }
-            } else {
-              columnDataTypeMap.put(measurementChecked, columnDataType);
-              measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
-            }
 
-            // This step indicates that the measurement exists under the device and is correct,
-            // First, update measurementSetOfGivenSuffix which is distinct
-            // Then if this measurement is recognized as NonExist before,update it to Exist
-            if (measurementSetOfGivenSuffix.add(measurementChecked)
-                || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
-              measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+              measurementSetOfGivenSuffix.add(measurementName);
+              measurementInfo.setMeasurementType(MeasurementType.Exist);
+              measurementInfoMap.put(measurementName, measurementInfo);
+              // update paths
+              paths.add(path);
             }
-
-            // update paths
-            paths.add(path);
           }
-
         } catch (MetadataException | QueryProcessException e) {
-          throw new LogicalOptimizeException(
-              String.format(
-                      "Error when getting all paths of a full path: %s", fullPath.getFullPath())
-                  + e.getMessage());
+          throw new QueryProcessException(e.getMessage());
         }
       }
 
-      // update measurements
       // Note that in the loop of a suffix path, set is used.
       // And across the loops of suffix paths, list is used.
       // e.g. select *,s1 from root.sg.d0, root.sg.d1
@@ -355,20 +316,17 @@ public class QueryOperator extends Operator {
     }
 
     convertSpecialClauseValues(alignByDevicePlan);
-    // slimit trim on the measurementColumnList
+    // sLimit trim on the measurementColumnList
     if (specialClauseComponent.hasSlimit()) {
-      int seriesSlimit = specialClauseComponent.getSeriesLimit();
+      int seriesSLimit = specialClauseComponent.getSeriesLimit();
       int seriesOffset = specialClauseComponent.getSeriesOffset();
-      measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+      measurements = slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
     }
 
     // assigns to alignByDevicePlan
     alignByDevicePlan.setMeasurements(measurements);
-    alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
     alignByDevicePlan.setDevices(devices);
-    alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
-    alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
-    alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
     alignByDevicePlan.setPaths(paths);
 
     // get deviceToFilterMap
@@ -407,6 +365,19 @@ public class QueryOperator extends Operator {
     return retDevices;
   }
 
+  private PartialPath getSuffixPathFromExpression(Expression expression) {
+    return expression instanceof TimeSeriesOperand
+        ? ((TimeSeriesOperand) expression).getPath()
+        : (((FunctionExpression) expression).getPaths().get(0));
+  }
+
+  private String getMeasurementName(String initialMeasurement, String aggregation) {
+    if (aggregation != null) {
+      initialMeasurement = aggregation + "(" + initialMeasurement + ")";
+    }
+    return initialMeasurement;
+  }
+
   private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
       throws QueryProcessException {
     int size = columnList.size();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 55a75eb..fd1bebd 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 import java.util.List;
@@ -30,23 +29,15 @@ import java.util.Map;
 
 public class AlignByDevicePlan extends QueryPlan {
 
-  private List<String>
-      measurements; // to record result measurement columns, e.g. temperature, status, speed
-  private Map<String, String>
-      measurementAliasMap; // select s1, s2 as speed from root, then s2 -> speed
+  // to record result measurement columns, e.g. temperature, status, speed
+  private List<String> measurements;
+  private Map<String, MeasurementInfo> measurementInfoMap;
+
   // to check data type consistency for the same name sensor of different devices
   private List<PartialPath> devices;
-  // to record the datatype of the column in the result set
-  private Map<String, TSDataType> columnDataTypeMap;
   private Map<String, IExpression> deviceToFilterMap;
-  // to record different kinds of measurement
-  private Map<String, MeasurementType> measurementTypeMap;
-
-  // to record the real type of the measurement
-  private Map<String, TSDataType> measurementDataTypeMap;
 
   private GroupByTimePlan groupByTimePlan;
-
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
 
@@ -67,14 +58,6 @@ public class AlignByDevicePlan extends QueryPlan {
     return measurements;
   }
 
-  public void setMeasurementAliasMap(Map<String, String> measurementAliasMap) {
-    this.measurementAliasMap = measurementAliasMap;
-  }
-
-  public Map<String, String> getMeasurementAliasMap() {
-    return measurementAliasMap;
-  }
-
   public void setDevices(List<PartialPath> devices) {
     this.devices = devices;
   }
@@ -83,14 +66,6 @@ public class AlignByDevicePlan extends QueryPlan {
     return devices;
   }
 
-  public void setColumnDataTypeMap(Map<String, TSDataType> columnDataTypeMap) {
-    this.columnDataTypeMap = columnDataTypeMap;
-  }
-
-  public Map<String, TSDataType> getColumnDataTypeMap() {
-    return columnDataTypeMap;
-  }
-
   public Map<String, IExpression> getDeviceToFilterMap() {
     return deviceToFilterMap;
   }
@@ -99,22 +74,6 @@ public class AlignByDevicePlan extends QueryPlan {
     this.deviceToFilterMap = deviceToFilterMap;
   }
 
-  public Map<String, MeasurementType> getMeasurementTypeMap() {
-    return measurementTypeMap;
-  }
-
-  public void setMeasurementTypeMap(Map<String, MeasurementType> measurementTypeMap) {
-    this.measurementTypeMap = measurementTypeMap;
-  }
-
-  public Map<String, TSDataType> getMeasurementDataTypeMap() {
-    return measurementDataTypeMap;
-  }
-
-  public void setMeasurementDataTypeMap(Map<String, TSDataType> measurementDataTypeMap) {
-    this.measurementDataTypeMap = measurementDataTypeMap;
-  }
-
   public GroupByTimePlan getGroupByTimePlan() {
     return groupByTimePlan;
   }
@@ -142,6 +101,14 @@ public class AlignByDevicePlan extends QueryPlan {
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
 
+  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+    this.measurementInfoMap = measurementInfoMap;
+  }
+
+  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+    return measurementInfoMap;
+  }
+
   /**
    * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
    * that do not exist in any device, data type is considered as String. The value is considered as
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
new file mode 100644
index 0000000..453ad36
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical.crud;
+
+import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class MeasurementInfo {
+
+  public MeasurementInfo() {}
+
+  public MeasurementInfo(MeasurementType measurementType) {
+    this.measurementType = measurementType;
+  }
+
+  // select s1, s2 as speed from root, then s2 -> speed
+  private String measurementAlias;
+
+  // to record different kinds of measurement
+  private MeasurementType measurementType;
+
+  // to record the real type of the measurement
+  private TSDataType measurementDataType;
+
+  // to record the datatype of the column in the result set
+  private TSDataType columnDataType;
+
+  public void setMeasurementAlias(String measurementAlias) {
+    this.measurementAlias = measurementAlias;
+  }
+
+  public String getMeasurementAlias() {
+    return measurementAlias;
+  }
+
+  public void setMeasurementType(MeasurementType measurementType) {
+    this.measurementType = measurementType;
+  }
+
+  public MeasurementType getMeasurementType() {
+    return measurementType;
+  }
+
+  public void setMeasurementDataType(TSDataType measurementDataType) {
+    this.measurementDataType = measurementDataType;
+  }
+
+  public TSDataType getMeasurementDataType() {
+    return measurementDataType;
+  }
+
+  public void setColumnDataType(TSDataType columnDataType) {
+    this.columnDataType = columnDataType;
+  }
+
+  public TSDataType getColumnDataType() {
+    return columnDataType;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index e2be172..242810b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -57,27 +57,6 @@ public class PhysicalGenerator {
     }
   }
 
-  /**
-   * get types for path list
-   *
-   * @return pair.left is the type of column in result set, pair.right is the real type of the
-   *     measurement
-   */
-  public Pair<List<TSDataType>, List<TSDataType>> getSeriesTypes(
-      List<PartialPath> paths, String aggregation) throws MetadataException {
-    List<TSDataType> measurementDataTypes = SchemaUtils.getSeriesTypesByPaths(paths, (String) null);
-    // if the aggregation function is null, the type of column in result set
-    // is equal to the real type of the measurement
-    if (aggregation == null) {
-      return new Pair<>(measurementDataTypes, measurementDataTypes);
-    } else {
-      // if the aggregation function is not null,
-      // we should recalculate the type of column in result set
-      List<TSDataType> columnDataTypes = SchemaUtils.getSeriesTypesByPaths(paths, aggregation);
-      return new Pair<>(columnDataTypes, measurementDataTypes);
-    }
-  }
-
   public List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException {
     return SchemaUtils.getSeriesTypesByPaths(paths);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 829f3f1..a34dd0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
@@ -50,6 +51,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 /** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
@@ -63,9 +65,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private List<String> measurements;
   private List<PartialPath> devices;
   private Map<String, IExpression> deviceToFilterMap;
-  private Map<String, MeasurementType> measurementTypeMap;
-  // record the real type of the corresponding measurement
-  private Map<String, TSDataType> measurementDataTypeMap;
+  private Map<String, MeasurementInfo> measurementInfoMap;
 
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
@@ -85,11 +85,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
     this.measurements = alignByDevicePlan.getMeasurements();
     this.devices = alignByDevicePlan.getDevices();
-    this.measurementDataTypeMap = alignByDevicePlan.getMeasurementDataTypeMap();
+    this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
     this.queryRouter = queryRouter;
     this.context = context;
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
-    this.measurementTypeMap = alignByDevicePlan.getMeasurementTypeMap();
 
     switch (alignByDevicePlan.getOperatorType()) {
       case GROUP_BY_TIME:
@@ -143,7 +142,11 @@ public class AlignByDeviceDataSet extends QueryDataSet {
       List<PartialPath> executePaths = new ArrayList<>();
       List<TSDataType> tsDataTypes = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (String column : measurementDataTypeMap.keySet()) {
+      for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
+        if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
+          continue;
+        }
+        String column = entry.getKey();
         String measurement = column;
         if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
           measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
@@ -154,7 +157,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
         if (measurementOfGivenDevice.contains(measurement)) {
           executeColumns.add(column);
           executePaths.add(currentDevice.concatNode(measurement));
-          tsDataTypes.add(measurementDataTypeMap.get(column));
+          tsDataTypes.add(measurementInfoMap.get(column).getMeasurementDataType());
         }
       }
 
@@ -263,7 +266,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     for (String measurement : measurements) {
-      switch (measurementTypeMap.get(measurement)) {
+      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
+      switch (measurementInfo.getMeasurementType()) {
         case Exist:
           if (currentColumnMap.get(measurement) != null) {
             rowRecord.addField(currentColumnMap.get(measurement));
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 0616f61..cdad7fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
@@ -57,6 +56,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
@@ -1009,23 +1009,23 @@ public class TSServiceImpl implements TSIService.Iface {
     deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
 
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, TSDataType> measurementDataTypeMap = plan.getColumnDataTypeMap();
+    Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap();
 
     // build column header with constant and non exist column and deduplication
     List<String> measurements = plan.getMeasurements();
-    Map<String, String> measurementAliasMap = plan.getMeasurementAliasMap();
-    Map<String, MeasurementType> measurementTypeMap = plan.getMeasurementTypeMap();
     for (String measurement : measurements) {
+      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
       TSDataType type = TSDataType.TEXT;
-      switch (measurementTypeMap.get(measurement)) {
+      switch (measurementInfo.getMeasurementType()) {
         case Exist:
-          type = measurementDataTypeMap.get(measurement);
+          type = measurementInfo.getColumnDataType();
           break;
         case NonExist:
         case Constant:
           type = TSDataType.TEXT;
       }
-      respColumns.add(measurementAliasMap.getOrDefault(measurement, measurement));
+      String measurementAlias = measurementInfo.getMeasurementAlias();
+      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
       columnTypes.add(type.toString());
 
       if (!deduplicatedMeasurements.contains(measurement)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index a57d6d3..f901b97 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -698,8 +698,7 @@ public class IoTDBAlignByDeviceIT {
       Assert.assertTrue(
           e.getMessage()
               .contains(
-                  "The data types of the same measurement column should be the same across devices in "
-                      + "ALIGN_BY_DEVICE sql."));
+                  "The data types of the same measurement column should be the same across devices."));
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAsIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAsIT.java
index 2ab39c9..5116c63 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAsIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAsIT.java
@@ -424,7 +424,7 @@ public class IoTDBAsIT {
       fail();
     } catch (Exception e) {
       Assert.assertTrue(
-          e.getMessage().contains("alias 'speed' can only be matched with one time series"));
+          e.getMessage().contains("alias speed can only be matched with one time series"));
     }
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryMemoryControlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryMemoryControlIT.java
index 05e3f14..5ba8a8e 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryMemoryControlIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBQueryMemoryControlIT.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.sql.Connection;
@@ -39,8 +39,6 @@ import static org.junit.Assert.fail;
 
 public class IoTDBQueryMemoryControlIT {
 
-  private int defaultMaxQueryDeduplicatedPathNum;
-
   private static final String[] sqls =
       new String[] {
         "set storage group to root.ln",
@@ -61,10 +59,8 @@ public class IoTDBQueryMemoryControlIT {
         "create timeseries root.ln.wf03.wt05 with datatype=TEXT,encoding=PLAIN",
       };
 
-  @Before
-  public void setUp() throws Exception {
-    defaultMaxQueryDeduplicatedPathNum =
-        IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum();
+  @BeforeClass
+  public static void setUp() throws Exception {
     IoTDBDescriptor.getInstance().getConfig().setMaxQueryDeduplicatedPathNum(10);
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
@@ -84,12 +80,10 @@ public class IoTDBQueryMemoryControlIT {
     }
   }
 
-  @After
-  public void tearDown() throws Exception {
+  @AfterClass
+  public static void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
-    IoTDBDescriptor.getInstance()
-        .getConfig()
-        .setMaxQueryDeduplicatedPathNum(defaultMaxQueryDeduplicatedPathNum);
+    IoTDBDescriptor.getInstance().getConfig().setMaxQueryDeduplicatedPathNum(1000);
   }
 
   @Test