You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/19 10:32:16 UTC

[iotdb] branch alignbydevice created (now b297a99)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at b297a99  recomplete align by device

This branch includes the following new commits:

     new b297a99  recomplete align by device

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: recomplete align by device

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b297a99f8de18712470552e32ddcd568be36bc7a
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Nov 19 18:27:24 2021 +0800

    recomplete align by device
---
 .../iotdb/db/qp/logical/crud/QueryOperator.java    | 127 +++++++++++----------
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  86 ++++++++------
 .../iotdb/db/qp/physical/crud/MeasurementInfo.java |  37 ++----
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  11 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |  98 ++++------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  29 ++---
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |   5 +-
 7 files changed, 165 insertions(+), 228 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 0d2fb81..c4b5040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -173,8 +172,18 @@ public class QueryOperator extends Operator {
   }
 
   public void check() throws LogicalOperatorException {
-    if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
-      throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
+    if (isAlignByDevice()) {
+      if (selectComponent.hasTimeSeriesGeneratingFunction()) {
+        throw new LogicalOperatorException(
+            "ALIGN BY DEVICE clause is not supported in UDF queries.");
+      }
+
+      for (PartialPath path : selectComponent.getPaths()) {
+        String device = path.getDevice();
+        if (!device.isEmpty()) {
+          throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+        }
+      }
     }
   }
 
@@ -230,74 +239,54 @@ public class QueryOperator extends Operator {
       throws QueryProcessException {
     AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
 
-    List<PartialPath> prefixPaths = fromComponent.getPrefixPaths();
     // remove stars in fromPaths and get deviceId with deduplication
-    List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
-    List<ResultColumn> resultColumns = selectComponent.getResultColumns();
+    List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
+    List<ResultColumn> resultColumns =
+        convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
     List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
     Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
     List<PartialPath> paths = new ArrayList<>();
+    List<String> aggregations = new ArrayList<>();
 
-    for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
+    // per suffix in SELECT
+    for (int i = 0; i < resultColumns.size(); i++) {
       ResultColumn resultColumn = resultColumns.get(i);
-      Expression suffixExpression = resultColumn.getExpression();
-      PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+      PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression());
       String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
-
       // to record measurements in the loop of a suffix path
       Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+      // concat suffix with per device
       for (PartialPath device : devices) {
         PartialPath fullPath = device.concatPath(suffixPath);
         try {
           // remove stars in SELECT to get actual paths
           List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath);
-          if (suffixPath.getNodes().length > 1) {
-            throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
-          }
           if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
             throw new QueryProcessException(
                 String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
           }
-          if (actualPaths.isEmpty()) {
-            String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
-            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
-              measurementInfoMap.putIfAbsent(
-                  nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
+          for (MeasurementPath path : actualPaths) {
+            MeasurementInfo measurementInfo =
+                new MeasurementInfo(getMeasurementName(path, aggregation));
+            TSDataType columnDataType = path.getSeriesType();
+            if (aggregation != null) {
+              columnDataType = getAggregationType(aggregation);
+              aggregations.add(aggregation);
             }
-          } else {
-            for (PartialPath path : actualPaths) {
-              String measurementName = getMeasurementName(path, aggregation);
-              TSDataType measurementDataType = path.getSeriesType();
-              TSDataType columnDataType = getAggregationType(aggregation);
-              columnDataType = columnDataType == null ? measurementDataType : columnDataType;
-              MeasurementInfo measurementInfo =
-                  measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
-
-              if (resultColumn.hasAlias()) {
-                measurementInfo.setMeasurementAlias(resultColumn.getAlias());
-              }
-
-              // check datatype consistency
-              // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
-              // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
-              if (measurementInfo.getColumnDataType() != null) {
-                if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
-                  throw new QueryProcessException(
-                      "The data types of the same measurement column should be the same across devices.");
-                }
-              } else {
-                measurementInfo.setColumnDataType(columnDataType);
-                measurementInfo.setMeasurementDataType(measurementDataType);
-              }
-
-              measurementSetOfGivenSuffix.add(measurementName);
-              measurementInfo.setMeasurementType(MeasurementType.Exist);
-              measurementInfoMap.put(measurementName, measurementInfo);
-              // update paths
-              paths.add(path);
+            checkDataTypeConsistency(
+                columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement()));
+
+            if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) {
+              measurementInfo.setMeasurementAlias(
+                  resultColumn.hasAlias() ? resultColumn.getAlias() : null);
+              measurementInfo.setColumnDataType(columnDataType);
+              measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo);
             }
+            measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement());
+            paths.add(path);
           }
         } catch (MetadataException | QueryProcessException e) {
           throw new QueryProcessException(e.getMessage());
@@ -313,22 +302,33 @@ public class QueryOperator extends Operator {
       measurements.addAll(measurementSetOfGivenSuffix);
     }
 
-    List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements);
     // assigns to alignByDevicePlan
-    alignByDevicePlan.setMeasurements(trimMeasurements);
-    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
-    alignByDevicePlan.setDevices(devices);
+    alignByDevicePlan.setMeasurements(measurements);
     alignByDevicePlan.setPaths(paths);
+    alignByDevicePlan.setAggregations(aggregations);
+    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
     alignByDevicePlan.setEnableTracing(enableTracing);
 
+    alignByDevicePlan.deduplicate(generator);
+
     if (whereComponent != null) {
       alignByDevicePlan.setDeviceToFilterMap(
-          concatFilterByDevice(devices, whereComponent.getFilterOperator()));
+          concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator()));
     }
 
     return alignByDevicePlan;
   }
 
+  private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo)
+      throws QueryProcessException {
+    // check datatype consistency
+    // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+    // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+    if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) {
+      throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE);
+    }
+  }
+
   private void convertSpecialClauseValues(QueryPlan queryPlan) {
     if (specialClauseComponent != null) {
       queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull());
@@ -340,16 +340,16 @@ public class QueryOperator extends Operator {
     }
   }
 
-  private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
-      throws QueryProcessException {
+  private List<ResultColumn> convertSpecialClauseValues(
+      QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
     convertSpecialClauseValues(queryPlan);
     // sLimit trim on the measurementColumnList
     if (specialClauseComponent.hasSlimit()) {
       int seriesSLimit = specialClauseComponent.getSeriesLimit();
       int seriesOffset = specialClauseComponent.getSeriesOffset();
-      return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
+      return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
     }
-    return measurements;
+    return resultColumns;
   }
 
   private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -390,9 +390,10 @@ public class QueryOperator extends Operator {
     return initialMeasurement;
   }
 
-  private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
+  private List<ResultColumn> slimitTrimColumn(
+      List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
       throws QueryProcessException {
-    int size = columnList.size();
+    int size = resultColumns.size();
 
     // check parameter range
     if (seriesOffset >= size) {
@@ -406,14 +407,15 @@ public class QueryOperator extends Operator {
     }
 
     // trim seriesPath list
-    return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
+    return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
   }
 
   // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
   // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
   //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
   private Map<String, IExpression> concatFilterByDevice(
-      List<PartialPath> devices, FilterOperator operator) throws QueryProcessException {
+      AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator)
+      throws QueryProcessException {
     Map<String, IExpression> deviceToFilterMap = new HashMap<>();
     Set<PartialPath> filterPaths = new HashSet<>();
     Iterator<PartialPath> deviceIterator = devices.iterator();
@@ -425,6 +427,7 @@ public class QueryOperator extends Operator {
         concatFilterPath(device, newOperator, filterPaths);
       } catch (LogicalOptimizeException | MetadataException e) {
         deviceIterator.remove();
+        alignByDevicePlan.removeDevice(device.getFullPath());
         continue;
       }
       // transform to a list so it can be indexed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index a4079a1..c1c0897 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -22,11 +22,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class AlignByDevicePlan extends QueryPlan {
 
@@ -34,14 +37,17 @@ public class AlignByDevicePlan extends QueryPlan {
       "The paths of the SELECT clause can only be measurements or STAR.";
   public static final String ALIAS_ERROR_MESSAGE =
       "alias %s can only be matched with one time series";
+  public static final String DATATYPE_ERROR_MESSAGE =
+      "The data types of the same measurement column should be the same across devices.";
 
   // to record result measurement columns, e.g. temperature, status, speed
   private List<String> measurements;
-  private List<TSDataType> dataTypes;
   private Map<String, MeasurementInfo> measurementInfoMap;
+  private List<PartialPath> deduplicatePaths;
+  private List<String> aggregations;
 
-  // to check data type consistency for the same name sensor of different devices
-  private List<PartialPath> devices;
+  // paths index of each device that need to execute
+  private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>();
   private Map<String, IExpression> deviceToFilterMap;
 
   private GroupByTimePlan groupByTimePlan;
@@ -54,7 +60,40 @@ public class AlignByDevicePlan extends QueryPlan {
 
   @Override
   public void deduplicate(PhysicalGenerator physicalGenerator) {
-    // do nothing
+    Set<PartialPath> deduplicatePaths = new LinkedHashSet<>();
+    List<String> deduplicatedAggregations = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = paths.get(i);
+      if (!deduplicatePaths.contains(path)) {
+        deduplicatePaths.add(path);
+        if (this.aggregations != null) {
+          deduplicatedAggregations.add(this.aggregations.get(i));
+        }
+        deviceToPathIndex
+            .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+            .add(deduplicatePaths.size() - 1);
+      }
+    }
+    // paths are deduplicated from here
+    this.deduplicatePaths = new ArrayList<>(deduplicatePaths);
+    setAggregations(deduplicatedAggregations);
+    this.paths = null;
+  }
+
+  public List<PartialPath> getDeduplicatePaths() {
+    return deduplicatePaths;
+  }
+
+  public void removeDevice(String device) {
+    deviceToPathIndex.remove(device);
+  }
+
+  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+    this.measurementInfoMap = measurementInfoMap;
+  }
+
+  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+    return measurementInfoMap;
   }
 
   public void setMeasurements(List<String> measurements) {
@@ -65,21 +104,20 @@ public class AlignByDevicePlan extends QueryPlan {
     return measurements;
   }
 
-  @Override
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
+  public List<String> getAggregations() {
+    return aggregations;
   }
 
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
+  public void setAggregations(List<String> aggregations) {
+    this.aggregations = aggregations.isEmpty() ? null : aggregations;
   }
 
-  public void setDevices(List<PartialPath> devices) {
-    this.devices = devices;
+  public Map<String, List<Integer>> getDeviceToPathIndex() {
+    return deviceToPathIndex;
   }
 
-  public List<PartialPath> getDevices() {
-    return devices;
+  public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) {
+    this.deviceToPathIndex = deviceToPathIndex;
   }
 
   public Map<String, IExpression> getDeviceToFilterMap() {
@@ -116,24 +154,4 @@ public class AlignByDevicePlan extends QueryPlan {
     this.aggregationPlan = aggregationPlan;
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
-
-  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
-    this.measurementInfoMap = measurementInfoMap;
-  }
-
-  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
-    return measurementInfoMap;
-  }
-
-  /**
-   * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
-   * that do not exist in any device, data type is considered as String. The value is considered as
-   * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
-   * considered as String and the value is the measurement name.
-   */
-  public enum MeasurementType {
-    Exist,
-    NonExist,
-    Constant
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
index 7c6dd17..c91493c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -18,51 +18,38 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class MeasurementInfo {
 
   public MeasurementInfo() {}
 
-  public MeasurementInfo(MeasurementType measurementType) {
-    this.measurementType = measurementType;
+  public MeasurementInfo(String measurement) {
+    this.measurement = measurement;
   }
 
+  private String measurement;
+
   // select s1, s2 as speed from root, then s2 -> speed
   private String measurementAlias;
 
-  // to record different kinds of measurement
-  private MeasurementType measurementType;
-
-  // to record the real type of the measurement, used for actual query
-  private TSDataType measurementDataType;
-
   // to record the datatype of the column in the result set
   private TSDataType columnDataType;
 
-  public void setMeasurementAlias(String measurementAlias) {
-    this.measurementAlias = measurementAlias;
+  public void setMeasurement(String measurement) {
+    this.measurement = measurement;
   }
 
-  public String getMeasurementAlias() {
-    return measurementAlias;
+  public String getMeasurement() {
+    return measurement;
   }
 
-  public void setMeasurementType(MeasurementType measurementType) {
-    this.measurementType = measurementType;
-  }
-
-  public MeasurementType getMeasurementType() {
-    return measurementType;
-  }
-
-  public void setMeasurementDataType(TSDataType measurementDataType) {
-    this.measurementDataType = measurementDataType;
+  public void setMeasurementAlias(String measurementAlias) {
+    this.measurementAlias = measurementAlias;
   }
 
-  public TSDataType getMeasurementDataType() {
-    return measurementDataType;
+  public String getMeasurementAlias() {
+    return measurementAlias;
   }
 
   public void setColumnDataType(TSDataType columnDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index e947144..83da2c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -74,14 +74,11 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   @Override
   public void setPaths(List<PartialPath> paths) {
-    if (paths == null) this.paths = null; // align by device
-    else {
-      List<MeasurementPath> measurementPaths = new ArrayList<>();
-      for (PartialPath path : paths) {
-        measurementPaths.add((MeasurementPath) path);
-      }
-      this.paths = measurementPaths;
+    List<MeasurementPath> measurementPaths = new ArrayList<>();
+    for (PartialPath path : paths) {
+      measurementPaths.add((MeasurementPath) path);
     }
+    this.paths = measurementPaths;
   }
 
   public List<TSDataType> getDataTypes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 620bfae..ce957cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -19,20 +19,15 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,8 +43,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
 /** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
 public class AlignByDeviceDataSet extends QueryDataSet {
@@ -60,9 +53,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private IExpression expression;
 
   private List<String> measurements;
-  private List<PartialPath> devices;
+  private List<PartialPath> paths;
+  private List<String> aggregations;
+  private Map<String, List<Integer>> deviceToPathIndex;
   private Map<String, IExpression> deviceToFilterMap;
-  private Map<String, MeasurementInfo> measurementInfoMap;
 
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
@@ -70,24 +64,26 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private RawDataQueryPlan rawDataQueryPlan;
 
   private boolean curDataSetInitialized;
-  private PartialPath currentDevice;
   private QueryDataSet currentDataSet;
-  private Iterator<PartialPath> deviceIterator;
+  private Iterator<String> deviceIterator;
+  private String currentDevice;
   private List<String> executeColumns;
   private int pathsNum = 0;
 
   public AlignByDeviceDataSet(
       AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
-    super(null, alignByDevicePlan.getDataTypes());
+    super(null, null);
     // align by device's column number is different from other datasets
     // TODO I don't know whether it's right or not in AlignedPath, remember to check here while
     // adapting AlignByDevice query for new vector
-    super.columnNum = alignByDevicePlan.getDataTypes().size();
+    super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device'
     this.measurements = alignByDevicePlan.getMeasurements();
-    this.devices = alignByDevicePlan.getDevices();
-    this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
+    this.paths = alignByDevicePlan.getDeduplicatePaths();
+    this.aggregations = alignByDevicePlan.getAggregations();
     this.queryRouter = queryRouter;
     this.context = context;
+    this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator();
+    this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex();
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
 
     switch (alignByDevicePlan.getOperatorType()) {
@@ -115,7 +111,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     this.curDataSetInitialized = false;
-    this.deviceIterator = devices.iterator();
   }
 
   public int getPathsNum() {
@@ -133,37 +128,22 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
     while (deviceIterator.hasNext()) {
       currentDevice = deviceIterator.next();
-      // get all measurements of current device
-      Map<String, MeasurementPath> measurementToPathMap =
-          getMeasurementsUnderGivenDevice(currentDevice);
-      Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
-
-      // extract paths and aggregations queried from all measurements
-      // executeColumns is for calculating rowRecord
       executeColumns = new ArrayList<>();
       List<PartialPath> executePaths = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
-        if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
-          continue;
-        }
-        String column = entry.getKey();
-        String measurement = column;
-        if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
-          measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
-          if (measurementOfGivenDevice.contains(measurement)) {
-            executeAggregations.add(column.substring(0, column.indexOf('(')));
-          }
-        }
-        if (measurementOfGivenDevice.contains(measurement)) {
-          executeColumns.add(column);
-          executePaths.add(measurementToPathMap.get(measurement));
+      for (int i : deviceToPathIndex.get(currentDevice)) {
+        executePaths.add(paths.get(i));
+        String executeColumn = paths.get(i).getMeasurement();
+        if (aggregations != null) {
+          executeAggregations.add(aggregations.get(i));
+          executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn);
         }
+        executeColumns.add(executeColumn);
       }
 
       // get filter to execute for the current device
       if (deviceToFilterMap != null) {
-        this.expression = deviceToFilterMap.get(currentDevice.getFullPath());
+        this.expression = deviceToFilterMap.get(currentDevice);
       }
 
       // for tracing: try to calculate the number of series paths
@@ -219,23 +199,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     return false;
   }
 
-  /** Get all measurements under given device. */
-  protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device)
-      throws IOException {
-    try {
-      // TODO: Implement this method in Cluster MManager
-      Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
-      List<MeasurementPath> measurementPaths =
-          IoTDB.metaManager.getAllMeasurementByDevicePath(device);
-      for (MeasurementPath measurementPath : measurementPaths) {
-        measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath);
-      }
-      return measurementToPathMap;
-    } catch (MetadataException e) {
-      throw new IOException("Cannot get node from " + device, e);
-    }
-  }
-
   @Override
   public RowRecord nextWithoutConstraint() throws IOException {
     RowRecord originRowRecord = currentDataSet.next();
@@ -243,7 +206,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp());
 
     Field deviceField = new Field(TSDataType.TEXT);
-    deviceField.setBinaryV(new Binary(currentDevice.getFullPath()));
+    deviceField.setBinaryV(new Binary(currentDevice));
     rowRecord.addField(deviceField);
     // device field should not be considered as a value field it should affect the WITHOUT NULL
     // judgement
@@ -256,23 +219,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          if (currentColumnMap.get(measurement) != null) {
-            rowRecord.addField(currentColumnMap.get(measurement));
-          } else {
-            rowRecord.addField(new Field(null));
-          }
-          break;
-        case NonExist:
-          rowRecord.addField(new Field(null));
-          break;
-        case Constant:
-          Field res = new Field(TSDataType.TEXT);
-          res.setBinaryV(Binary.valueOf(measurement));
-          rowRecord.addField(res);
-          break;
+      if (currentColumnMap.get(measurement) != null) {
+        rowRecord.addField(currentColumnMap.get(measurement));
+      } else {
+        rowRecord.addField(new Field(null));
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..53fcb0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -883,42 +883,27 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
 
     // get column types and do deduplication
     columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result
-    List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
-    deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
 
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap();
-
     // build column header with constant and non exist column and deduplication
     List<String> measurements = plan.getMeasurements();
     for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
+      MeasurementInfo measurementInfo = plan.getMeasurementInfoMap().get(measurement);
       TSDataType type = TSDataType.TEXT;
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          type = measurementInfo.getColumnDataType();
-          break;
-        case NonExist:
-        case Constant:
-          type = TSDataType.TEXT;
+      String measurementName = measurement;
+      if (measurementInfo != null) {
+        type = measurementInfo.getColumnDataType();
+        measurementName = measurementInfo.getMeasurementAlias();
       }
-      String measurementAlias = measurementInfo.getMeasurementAlias();
-      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
+      respColumns.add(measurementName != null ? measurementName : measurement);
       columnTypes.add(type.toString());
 
-      if (!deduplicatedMeasurements.contains(measurement)) {
-        deduplicatedMeasurements.add(measurement);
-        deduplicatedColumnsType.add(type);
-      }
+      deduplicatedMeasurements.add(measurement);
     }
 
     // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
     // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
     plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements));
-    plan.setDataTypes(deduplicatedColumnsType);
-
-    // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
-    plan.setPaths(null);
   }
 
   private TSExecuteStatementResp executeSelectIntoStatement(
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 7467407..4e99c9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -330,11 +330,8 @@ public class IoTDBAlignByDeviceIT {
           "103,root.vehicle.d0,199,null,",
           "104,root.vehicle.d0,190,null,",
           "105,root.vehicle.d0,199,11.11,",
-          "106,root.vehicle.d0,null,null,",
           "1000,root.vehicle.d0,55555,1000.11,",
           "946684800000,root.vehicle.d0,100,null,",
-          "1,root.vehicle.d1,null,null,",
-          "1000,root.vehicle.d1,null,null,",
         };
 
     Class.forName(Config.JDBC_DRIVER_NAME);
@@ -371,7 +368,7 @@ public class IoTDBAlignByDeviceIT {
           Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
           cnt++;
         }
-        Assert.assertEquals(16, cnt);
+        Assert.assertEquals(13, cnt);
       }
     } catch (Exception e) {
       e.printStackTrace();