You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/19 10:32:17 UTC

[iotdb] 01/01: recomplete align by device

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b297a99f8de18712470552e32ddcd568be36bc7a
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Nov 19 18:27:24 2021 +0800

    recomplete align by device
---
 .../iotdb/db/qp/logical/crud/QueryOperator.java    | 127 +++++++++++----------
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  86 ++++++++------
 .../iotdb/db/qp/physical/crud/MeasurementInfo.java |  37 ++----
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |  11 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |  98 ++++------------
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  29 ++---
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |   5 +-
 7 files changed, 165 insertions(+), 228 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 0d2fb81..c4b5040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -173,8 +172,18 @@ public class QueryOperator extends Operator {
   }
 
   public void check() throws LogicalOperatorException {
-    if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
-      throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
+    if (isAlignByDevice()) {
+      if (selectComponent.hasTimeSeriesGeneratingFunction()) {
+        throw new LogicalOperatorException(
+            "ALIGN BY DEVICE clause is not supported in UDF queries.");
+      }
+
+      for (PartialPath path : selectComponent.getPaths()) {
+        String device = path.getDevice();
+        if (!device.isEmpty()) {
+          throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+        }
+      }
     }
   }
 
@@ -230,74 +239,54 @@ public class QueryOperator extends Operator {
       throws QueryProcessException {
     AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
 
-    List<PartialPath> prefixPaths = fromComponent.getPrefixPaths();
     // remove stars in fromPaths and get deviceId with deduplication
-    List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
-    List<ResultColumn> resultColumns = selectComponent.getResultColumns();
+    List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
+    List<ResultColumn> resultColumns =
+        convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
     List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
     Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
     List<PartialPath> paths = new ArrayList<>();
+    List<String> aggregations = new ArrayList<>();
 
-    for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
+    // per suffix in SELECT
+    for (int i = 0; i < resultColumns.size(); i++) {
       ResultColumn resultColumn = resultColumns.get(i);
-      Expression suffixExpression = resultColumn.getExpression();
-      PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+      PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression());
       String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
-
       // to record measurements in the loop of a suffix path
       Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+      // concat suffix with per device
       for (PartialPath device : devices) {
         PartialPath fullPath = device.concatPath(suffixPath);
         try {
           // remove stars in SELECT to get actual paths
           List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath);
-          if (suffixPath.getNodes().length > 1) {
-            throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
-          }
           if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
             throw new QueryProcessException(
                 String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
           }
-          if (actualPaths.isEmpty()) {
-            String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
-            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
-              measurementInfoMap.putIfAbsent(
-                  nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
+          for (MeasurementPath path : actualPaths) {
+            MeasurementInfo measurementInfo =
+                new MeasurementInfo(getMeasurementName(path, aggregation));
+            TSDataType columnDataType = path.getSeriesType();
+            if (aggregation != null) {
+              columnDataType = getAggregationType(aggregation);
+              aggregations.add(aggregation);
             }
-          } else {
-            for (PartialPath path : actualPaths) {
-              String measurementName = getMeasurementName(path, aggregation);
-              TSDataType measurementDataType = path.getSeriesType();
-              TSDataType columnDataType = getAggregationType(aggregation);
-              columnDataType = columnDataType == null ? measurementDataType : columnDataType;
-              MeasurementInfo measurementInfo =
-                  measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
-
-              if (resultColumn.hasAlias()) {
-                measurementInfo.setMeasurementAlias(resultColumn.getAlias());
-              }
-
-              // check datatype consistency
-              // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
-              // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
-              if (measurementInfo.getColumnDataType() != null) {
-                if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
-                  throw new QueryProcessException(
-                      "The data types of the same measurement column should be the same across devices.");
-                }
-              } else {
-                measurementInfo.setColumnDataType(columnDataType);
-                measurementInfo.setMeasurementDataType(measurementDataType);
-              }
-
-              measurementSetOfGivenSuffix.add(measurementName);
-              measurementInfo.setMeasurementType(MeasurementType.Exist);
-              measurementInfoMap.put(measurementName, measurementInfo);
-              // update paths
-              paths.add(path);
+            checkDataTypeConsistency(
+                columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement()));
+
+            if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) {
+              measurementInfo.setMeasurementAlias(
+                  resultColumn.hasAlias() ? resultColumn.getAlias() : null);
+              measurementInfo.setColumnDataType(columnDataType);
+              measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo);
             }
+            measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement());
+            paths.add(path);
           }
         } catch (MetadataException | QueryProcessException e) {
           throw new QueryProcessException(e.getMessage());
@@ -313,22 +302,33 @@ public class QueryOperator extends Operator {
       measurements.addAll(measurementSetOfGivenSuffix);
     }
 
-    List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements);
     // assigns to alignByDevicePlan
-    alignByDevicePlan.setMeasurements(trimMeasurements);
-    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
-    alignByDevicePlan.setDevices(devices);
+    alignByDevicePlan.setMeasurements(measurements);
     alignByDevicePlan.setPaths(paths);
+    alignByDevicePlan.setAggregations(aggregations);
+    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
     alignByDevicePlan.setEnableTracing(enableTracing);
 
+    alignByDevicePlan.deduplicate(generator);
+
     if (whereComponent != null) {
       alignByDevicePlan.setDeviceToFilterMap(
-          concatFilterByDevice(devices, whereComponent.getFilterOperator()));
+          concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator()));
     }
 
     return alignByDevicePlan;
   }
 
+  private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo)
+      throws QueryProcessException {
+    // check datatype consistency
+    // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+    // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+    if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) {
+      throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE);
+    }
+  }
+
   private void convertSpecialClauseValues(QueryPlan queryPlan) {
     if (specialClauseComponent != null) {
       queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull());
@@ -340,16 +340,16 @@ public class QueryOperator extends Operator {
     }
   }
 
-  private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
-      throws QueryProcessException {
+  private List<ResultColumn> convertSpecialClauseValues(
+      QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
     convertSpecialClauseValues(queryPlan);
     // sLimit trim on the measurementColumnList
     if (specialClauseComponent.hasSlimit()) {
       int seriesSLimit = specialClauseComponent.getSeriesLimit();
       int seriesOffset = specialClauseComponent.getSeriesOffset();
-      return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
+      return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
     }
-    return measurements;
+    return resultColumns;
   }
 
   private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -390,9 +390,10 @@ public class QueryOperator extends Operator {
     return initialMeasurement;
   }
 
-  private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
+  private List<ResultColumn> slimitTrimColumn(
+      List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
       throws QueryProcessException {
-    int size = columnList.size();
+    int size = resultColumns.size();
 
     // check parameter range
     if (seriesOffset >= size) {
@@ -406,14 +407,15 @@ public class QueryOperator extends Operator {
     }
 
     // trim seriesPath list
-    return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
+    return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
   }
 
   // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
   // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
   //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
   private Map<String, IExpression> concatFilterByDevice(
-      List<PartialPath> devices, FilterOperator operator) throws QueryProcessException {
+      AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator)
+      throws QueryProcessException {
     Map<String, IExpression> deviceToFilterMap = new HashMap<>();
     Set<PartialPath> filterPaths = new HashSet<>();
     Iterator<PartialPath> deviceIterator = devices.iterator();
@@ -425,6 +427,7 @@ public class QueryOperator extends Operator {
         concatFilterPath(device, newOperator, filterPaths);
       } catch (LogicalOptimizeException | MetadataException e) {
         deviceIterator.remove();
+        alignByDevicePlan.removeDevice(device.getFullPath());
         continue;
       }
       // transform to a list so it can be indexed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index a4079a1..c1c0897 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -22,11 +22,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class AlignByDevicePlan extends QueryPlan {
 
@@ -34,14 +37,17 @@ public class AlignByDevicePlan extends QueryPlan {
       "The paths of the SELECT clause can only be measurements or STAR.";
   public static final String ALIAS_ERROR_MESSAGE =
       "alias %s can only be matched with one time series";
+  public static final String DATATYPE_ERROR_MESSAGE =
+      "The data types of the same measurement column should be the same across devices.";
 
   // to record result measurement columns, e.g. temperature, status, speed
   private List<String> measurements;
-  private List<TSDataType> dataTypes;
   private Map<String, MeasurementInfo> measurementInfoMap;
+  private List<PartialPath> deduplicatePaths;
+  private List<String> aggregations;
 
-  // to check data type consistency for the same name sensor of different devices
-  private List<PartialPath> devices;
+  // paths index of each device that need to execute
+  private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>();
   private Map<String, IExpression> deviceToFilterMap;
 
   private GroupByTimePlan groupByTimePlan;
@@ -54,7 +60,40 @@ public class AlignByDevicePlan extends QueryPlan {
 
   @Override
   public void deduplicate(PhysicalGenerator physicalGenerator) {
-    // do nothing
+    Set<PartialPath> deduplicatePaths = new LinkedHashSet<>();
+    List<String> deduplicatedAggregations = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = paths.get(i);
+      if (!deduplicatePaths.contains(path)) {
+        deduplicatePaths.add(path);
+        if (this.aggregations != null) {
+          deduplicatedAggregations.add(this.aggregations.get(i));
+        }
+        deviceToPathIndex
+            .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+            .add(deduplicatePaths.size() - 1);
+      }
+    }
+    // paths are deduplicated from here
+    this.deduplicatePaths = new ArrayList<>(deduplicatePaths);
+    setAggregations(deduplicatedAggregations);
+    this.paths = null;
+  }
+
+  public List<PartialPath> getDeduplicatePaths() {
+    return deduplicatePaths;
+  }
+
+  public void removeDevice(String device) {
+    deviceToPathIndex.remove(device);
+  }
+
+  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+    this.measurementInfoMap = measurementInfoMap;
+  }
+
+  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+    return measurementInfoMap;
   }
 
   public void setMeasurements(List<String> measurements) {
@@ -65,21 +104,20 @@ public class AlignByDevicePlan extends QueryPlan {
     return measurements;
   }
 
-  @Override
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
+  public List<String> getAggregations() {
+    return aggregations;
   }
 
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
+  public void setAggregations(List<String> aggregations) {
+    this.aggregations = aggregations.isEmpty() ? null : aggregations;
   }
 
-  public void setDevices(List<PartialPath> devices) {
-    this.devices = devices;
+  public Map<String, List<Integer>> getDeviceToPathIndex() {
+    return deviceToPathIndex;
   }
 
-  public List<PartialPath> getDevices() {
-    return devices;
+  public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) {
+    this.deviceToPathIndex = deviceToPathIndex;
   }
 
   public Map<String, IExpression> getDeviceToFilterMap() {
@@ -116,24 +154,4 @@ public class AlignByDevicePlan extends QueryPlan {
     this.aggregationPlan = aggregationPlan;
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
-
-  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
-    this.measurementInfoMap = measurementInfoMap;
-  }
-
-  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
-    return measurementInfoMap;
-  }
-
-  /**
-   * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
-   * that do not exist in any device, data type is considered as String. The value is considered as
-   * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
-   * considered as String and the value is the measurement name.
-   */
-  public enum MeasurementType {
-    Exist,
-    NonExist,
-    Constant
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
index 7c6dd17..c91493c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -18,51 +18,38 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class MeasurementInfo {
 
   public MeasurementInfo() {}
 
-  public MeasurementInfo(MeasurementType measurementType) {
-    this.measurementType = measurementType;
+  public MeasurementInfo(String measurement) {
+    this.measurement = measurement;
   }
 
+  private String measurement;
+
   // select s1, s2 as speed from root, then s2 -> speed
   private String measurementAlias;
 
-  // to record different kinds of measurement
-  private MeasurementType measurementType;
-
-  // to record the real type of the measurement, used for actual query
-  private TSDataType measurementDataType;
-
   // to record the datatype of the column in the result set
   private TSDataType columnDataType;
 
-  public void setMeasurementAlias(String measurementAlias) {
-    this.measurementAlias = measurementAlias;
+  public void setMeasurement(String measurement) {
+    this.measurement = measurement;
   }
 
-  public String getMeasurementAlias() {
-    return measurementAlias;
+  public String getMeasurement() {
+    return measurement;
   }
 
-  public void setMeasurementType(MeasurementType measurementType) {
-    this.measurementType = measurementType;
-  }
-
-  public MeasurementType getMeasurementType() {
-    return measurementType;
-  }
-
-  public void setMeasurementDataType(TSDataType measurementDataType) {
-    this.measurementDataType = measurementDataType;
+  public void setMeasurementAlias(String measurementAlias) {
+    this.measurementAlias = measurementAlias;
   }
 
-  public TSDataType getMeasurementDataType() {
-    return measurementDataType;
+  public String getMeasurementAlias() {
+    return measurementAlias;
   }
 
   public void setColumnDataType(TSDataType columnDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index e947144..83da2c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -74,14 +74,11 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   @Override
   public void setPaths(List<PartialPath> paths) {
-    if (paths == null) this.paths = null; // align by device
-    else {
-      List<MeasurementPath> measurementPaths = new ArrayList<>();
-      for (PartialPath path : paths) {
-        measurementPaths.add((MeasurementPath) path);
-      }
-      this.paths = measurementPaths;
+    List<MeasurementPath> measurementPaths = new ArrayList<>();
+    for (PartialPath path : paths) {
+      measurementPaths.add((MeasurementPath) path);
     }
+    this.paths = measurementPaths;
   }
 
   public List<TSDataType> getDataTypes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 620bfae..ce957cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -19,20 +19,15 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,8 +43,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
 /** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
 public class AlignByDeviceDataSet extends QueryDataSet {
@@ -60,9 +53,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private IExpression expression;
 
   private List<String> measurements;
-  private List<PartialPath> devices;
+  private List<PartialPath> paths;
+  private List<String> aggregations;
+  private Map<String, List<Integer>> deviceToPathIndex;
   private Map<String, IExpression> deviceToFilterMap;
-  private Map<String, MeasurementInfo> measurementInfoMap;
 
   private GroupByTimePlan groupByTimePlan;
   private FillQueryPlan fillQueryPlan;
@@ -70,24 +64,26 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private RawDataQueryPlan rawDataQueryPlan;
 
   private boolean curDataSetInitialized;
-  private PartialPath currentDevice;
   private QueryDataSet currentDataSet;
-  private Iterator<PartialPath> deviceIterator;
+  private Iterator<String> deviceIterator;
+  private String currentDevice;
   private List<String> executeColumns;
   private int pathsNum = 0;
 
   public AlignByDeviceDataSet(
       AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
-    super(null, alignByDevicePlan.getDataTypes());
+    super(null, null);
     // align by device's column number is different from other datasets
     // TODO I don't know whether it's right or not in AlignedPath, remember to check here while
     // adapting AlignByDevice query for new vector
-    super.columnNum = alignByDevicePlan.getDataTypes().size();
+    super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device'
     this.measurements = alignByDevicePlan.getMeasurements();
-    this.devices = alignByDevicePlan.getDevices();
-    this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
+    this.paths = alignByDevicePlan.getDeduplicatePaths();
+    this.aggregations = alignByDevicePlan.getAggregations();
     this.queryRouter = queryRouter;
     this.context = context;
+    this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator();
+    this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex();
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
 
     switch (alignByDevicePlan.getOperatorType()) {
@@ -115,7 +111,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     this.curDataSetInitialized = false;
-    this.deviceIterator = devices.iterator();
   }
 
   public int getPathsNum() {
@@ -133,37 +128,22 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
     while (deviceIterator.hasNext()) {
       currentDevice = deviceIterator.next();
-      // get all measurements of current device
-      Map<String, MeasurementPath> measurementToPathMap =
-          getMeasurementsUnderGivenDevice(currentDevice);
-      Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
-
-      // extract paths and aggregations queried from all measurements
-      // executeColumns is for calculating rowRecord
       executeColumns = new ArrayList<>();
       List<PartialPath> executePaths = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
-        if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
-          continue;
-        }
-        String column = entry.getKey();
-        String measurement = column;
-        if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
-          measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
-          if (measurementOfGivenDevice.contains(measurement)) {
-            executeAggregations.add(column.substring(0, column.indexOf('(')));
-          }
-        }
-        if (measurementOfGivenDevice.contains(measurement)) {
-          executeColumns.add(column);
-          executePaths.add(measurementToPathMap.get(measurement));
+      for (int i : deviceToPathIndex.get(currentDevice)) {
+        executePaths.add(paths.get(i));
+        String executeColumn = paths.get(i).getMeasurement();
+        if (aggregations != null) {
+          executeAggregations.add(aggregations.get(i));
+          executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn);
         }
+        executeColumns.add(executeColumn);
       }
 
       // get filter to execute for the current device
       if (deviceToFilterMap != null) {
-        this.expression = deviceToFilterMap.get(currentDevice.getFullPath());
+        this.expression = deviceToFilterMap.get(currentDevice);
       }
 
       // for tracing: try to calculate the number of series paths
@@ -219,23 +199,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     return false;
   }
 
-  /** Get all measurements under given device. */
-  protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device)
-      throws IOException {
-    try {
-      // TODO: Implement this method in Cluster MManager
-      Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
-      List<MeasurementPath> measurementPaths =
-          IoTDB.metaManager.getAllMeasurementByDevicePath(device);
-      for (MeasurementPath measurementPath : measurementPaths) {
-        measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath);
-      }
-      return measurementToPathMap;
-    } catch (MetadataException e) {
-      throw new IOException("Cannot get node from " + device, e);
-    }
-  }
-
   @Override
   public RowRecord nextWithoutConstraint() throws IOException {
     RowRecord originRowRecord = currentDataSet.next();
@@ -243,7 +206,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp());
 
     Field deviceField = new Field(TSDataType.TEXT);
-    deviceField.setBinaryV(new Binary(currentDevice.getFullPath()));
+    deviceField.setBinaryV(new Binary(currentDevice));
     rowRecord.addField(deviceField);
     // device field should not be considered as a value field it should affect the WITHOUT NULL
     // judgement
@@ -256,23 +219,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          if (currentColumnMap.get(measurement) != null) {
-            rowRecord.addField(currentColumnMap.get(measurement));
-          } else {
-            rowRecord.addField(new Field(null));
-          }
-          break;
-        case NonExist:
-          rowRecord.addField(new Field(null));
-          break;
-        case Constant:
-          Field res = new Field(TSDataType.TEXT);
-          res.setBinaryV(Binary.valueOf(measurement));
-          rowRecord.addField(res);
-          break;
+      if (currentColumnMap.get(measurement) != null) {
+        rowRecord.addField(currentColumnMap.get(measurement));
+      } else {
+        rowRecord.addField(new Field(null));
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..53fcb0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -883,42 +883,27 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
 
     // get column types and do deduplication
     columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result
-    List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
-    deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
 
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap();
-
     // build column header with constant and non exist column and deduplication
     List<String> measurements = plan.getMeasurements();
     for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
+      MeasurementInfo measurementInfo = plan.getMeasurementInfoMap().get(measurement);
       TSDataType type = TSDataType.TEXT;
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          type = measurementInfo.getColumnDataType();
-          break;
-        case NonExist:
-        case Constant:
-          type = TSDataType.TEXT;
+      String measurementName = measurement;
+      if (measurementInfo != null) {
+        type = measurementInfo.getColumnDataType();
+        measurementName = measurementInfo.getMeasurementAlias();
       }
-      String measurementAlias = measurementInfo.getMeasurementAlias();
-      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
+      respColumns.add(measurementName != null ? measurementName : measurement);
       columnTypes.add(type.toString());
 
-      if (!deduplicatedMeasurements.contains(measurement)) {
-        deduplicatedMeasurements.add(measurement);
-        deduplicatedColumnsType.add(type);
-      }
+      deduplicatedMeasurements.add(measurement);
     }
 
     // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
     // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
     plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements));
-    plan.setDataTypes(deduplicatedColumnsType);
-
-    // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
-    plan.setPaths(null);
   }
 
   private TSExecuteStatementResp executeSelectIntoStatement(
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 7467407..4e99c9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -330,11 +330,8 @@ public class IoTDBAlignByDeviceIT {
           "103,root.vehicle.d0,199,null,",
           "104,root.vehicle.d0,190,null,",
           "105,root.vehicle.d0,199,11.11,",
-          "106,root.vehicle.d0,null,null,",
           "1000,root.vehicle.d0,55555,1000.11,",
           "946684800000,root.vehicle.d0,100,null,",
-          "1,root.vehicle.d1,null,null,",
-          "1000,root.vehicle.d1,null,null,",
         };
 
     Class.forName(Config.JDBC_DRIVER_NAME);
@@ -371,7 +368,7 @@ public class IoTDBAlignByDeviceIT {
           Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
           cnt++;
         }
-        Assert.assertEquals(16, cnt);
+        Assert.assertEquals(13, cnt);
       }
     } catch (Exception e) {
       e.printStackTrace();