You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/13 10:57:34 UTC

[iotdb] 02/04: refactor analyzer

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/mppSelectInto
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4904bea0ad14151e73ac4a3ebe10d1432b73728b
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Oct 13 16:03:29 2022 +0800

    refactor analyzer
---
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  31 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 398 +++------------------
 .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 ++++++
 .../parameter/IntoDeviceMeasurementDescriptor.java |  57 +++
 .../planner/plan/parameter/IntoPathDescriptor.java |  60 ++++
 .../plan/statement/component/IntoComponent.java    | 188 +++++++++-
 6 files changed, 479 insertions(+), 360 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 119f67654a..ea344ca9fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PatternTreeMap;
-import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.NodeRef;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -33,6 +31,8 @@ import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoDeviceMeasurementDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -140,12 +140,14 @@ public class Analysis {
   private DatasetHeader respDatasetHeader;
 
   /////////////////////////////////////////////////////////////////////////////////////////////////
-  // SELECT INTO Analysis (used in ALIGN BY DEVICE)
+  // SELECT INTO Analysis
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap;
+  // used in ALIGN BY DEVICE
+  private IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor;
 
-  private Map<PartialPath, Boolean> intoDeviceToAlignedMap;
+  // used in ALIGN BY TIME
+  private IntoPathDescriptor intoPathDescriptor;
 
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // Schema Query Analysis
@@ -414,21 +416,20 @@ public class Analysis {
     this.deviceViewOutputExpressions = deviceViewOutputExpressions;
   }
 
-  public PatternTreeMap<String, PatternTreeMapFactory.StringSerializer>
-      getIntoPathPatternTreeMap() {
-    return intoPathPatternTreeMap;
+  public IntoDeviceMeasurementDescriptor getIntoDeviceMeasurementDescriptor() {
+    return intoDeviceMeasurementDescriptor;
   }
 
-  public void setIntoPathPatternTreeMap(
-      PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap) {
-    this.intoPathPatternTreeMap = intoPathPatternTreeMap;
+  public void setIntoDeviceMeasurementDescriptor(
+      IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor) {
+    this.intoDeviceMeasurementDescriptor = intoDeviceMeasurementDescriptor;
   }
 
-  public Map<PartialPath, Boolean> getIntoDeviceToAlignedMap() {
-    return intoDeviceToAlignedMap;
+  public IntoPathDescriptor getIntoPathDescriptor() {
+    return intoPathDescriptor;
   }
 
-  public void setIntoDeviceToAlignedMap(Map<PartialPath, Boolean> intoDeviceToAlignedMap) {
-    this.intoDeviceToAlignedMap = intoDeviceToAlignedMap;
+  public void setIntoPathDescriptor(IntoPathDescriptor intoPathDescriptor) {
+    this.intoPathDescriptor = intoPathDescriptor;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f28f1064fb..dcd90bf3fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
@@ -39,7 +38,6 @@ import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeExcept
 import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
-import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
@@ -54,6 +52,8 @@ import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoDeviceMeasurementDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
@@ -61,7 +61,6 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.IntoItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -142,16 +141,16 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TimeZone;
-import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
 import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetDevice;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetPath;
 
 /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
 public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
@@ -1000,368 +999,85 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       QueryStatement queryStatement,
       Set<PartialPath> deviceSet,
       List<Pair<Expression, String>> outputExpressions) {
-    List<Expression> outputColumns =
+    if (!queryStatement.isSelectInto()) {
+      return;
+    }
+
+    List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
+    List<Expression> sourceColumns =
         outputExpressions.stream()
             .map(Pair::getLeft)
             .collect(Collectors.toCollection(ArrayList::new));
-    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(outputColumns);
 
     IntoComponent intoComponent = queryStatement.getIntoComponent();
-    List<IntoItem> intoItems = intoComponent.getIntoItems();
-
-    List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
-    List<PartialPath> targetDevices = new ArrayList<>(sourceDevices.size());
-    if (intoComponent.isDeviceExistPlaceholder()) {
-      if (intoItems.size() > 1) {
-        throw new SemanticException("");
-      }
-
-      PartialPath deviceTemplate = intoItems.get(0).getIntoDevice();
-      for (PartialPath sourceDevice : sourceDevices) {
-        targetDevices.add(constructIntoDevice(sourceDevice, deviceTemplate));
-      }
-    } else {
-      if (intoItems.size() != sourceDevices.size()) {
-        throw new SemanticException("");
-      }
-
-      for (IntoItem intoItem : intoItems) {
-        targetDevices.add(intoItem.getIntoDevice());
-      }
-    }
-
-    PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap =
-        PatternTreeMapFactory.getIntoPathPatternTreeMap();
-
-    if (intoComponent.isMeasurementsExistPlaceholder()) {
-      if (!isAllRawSeriesQuery) {
-        throw new SemanticException("");
-      }
-
-      if (intoComponent.isDeviceExistPlaceholder()) {
-        String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
-        for (int i = 0; i < targetDevices.size(); i++) {
-          PartialPath targetDevice = targetDevices.get(i);
-          for (Expression outputColumn : outputColumns) {
-            PartialPath intoPath =
-                targetDevice.concatNode(
-                    constructIntoMeasurement(
-                        sourceDevices.get(i).concatNode(outputColumn.toString()),
-                        measurementTemplate));
-            intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
-            if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
-              throw new SemanticException(
-                  "select into: target paths in into clause should be different.");
-            }
-          }
-        }
-      } else {
-        for (int deviceIndex = 0; deviceIndex < sourceDevices.size(); deviceIndex++) {
-          IntoItem intoItem = intoItems.get(deviceIndex);
-          List<String> intoMeasurements = intoItem.getIntoMeasurements();
-          PartialPath targetDevice = targetDevices.get(deviceIndex);
-
-          if (intoItem.isMeasurementsExistPlaceholder()) {
-            if (intoMeasurements.size() > 1) {
-              throw new SemanticException("");
-            }
-
-            String measurementTemplate = intoMeasurements.get(0);
-            for (Expression outputColumn : outputColumns) {
-              PartialPath intoPath =
-                  targetDevice.concatNode(
-                      constructIntoMeasurement(
-                          sourceDevices.get(deviceIndex).concatNode(outputColumn.toString()),
-                          measurementTemplate));
-              intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
-              if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
-                throw new SemanticException(
-                    "select into: target paths in into clause should be different.");
-              }
-            }
-          } else {
-            if (intoMeasurements.size() != outputColumns.size()) {
-              throw new SemanticException(
-                  "select into: the number of source columns and the number of target paths should be the same.");
-            }
-
-            for (int measurementIndex = 0;
-                measurementIndex < intoMeasurements.size();
-                measurementIndex++) {
-              PartialPath intoPath =
-                  targetDevice.concatNode(intoMeasurements.get(measurementIndex));
-              intoPathPatternTreeMap.append(
-                  intoPath, outputColumns.get(measurementIndex).toString());
-              if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
-                throw new SemanticException(
-                    "select into: target paths in into clause should be different.");
-              }
-            }
-          }
-        }
-      }
-    } else {
-      for (int deviceIndex = 0; deviceIndex < sourceDevices.size(); deviceIndex++) {
-        IntoItem intoItem = intoItems.get(deviceIndex);
-        List<String> intoMeasurements = intoItem.getIntoMeasurements();
-        if (intoMeasurements.size() != outputColumns.size()) {
-          throw new SemanticException(
-              "select into: the number of source columns and the number of target paths should be the same.");
-        }
-
-        PartialPath targetDevice = targetDevices.get(deviceIndex);
-        for (int measurementIndex = 0;
-            measurementIndex < intoMeasurements.size();
-            measurementIndex++) {
-          PartialPath intoPath = targetDevice.concatNode(intoMeasurements.get(measurementIndex));
-          intoPathPatternTreeMap.append(intoPath, outputColumns.get(measurementIndex).toString());
-          if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
-            throw new SemanticException(
-                "select into: target paths in into clause should be different.");
-          }
-        }
-      }
-    }
-
-    if (isAllRawSeriesQuery) {
-      if (intoComponent.isDeviceExistPlaceholder()) {
-        if (intoComponent.isMeasurementsExistPlaceholder()) {
-          String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
-          for (int i = 0; i < targetDevices.size(); i++) {
-            PartialPath targetDevice = targetDevices.get(i);
-            for (Expression outputColumn : outputColumns) {
-              PartialPath intoPath =
-                  targetDevice.concatNode(
-                      constructIntoMeasurement(
-                          sourceDevices.get(i).concatNode(outputColumn.toString()),
-                          measurementTemplate));
-              intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
-              if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
-                throw new SemanticException(
-                    "select into: target paths in into clause should be different.");
-              }
-            }
-          }
+    intoComponent.validate(sourceDevices, sourceColumns);
+
+    IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor =
+        new IntoDeviceMeasurementDescriptor();
+    IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
+        intoComponent.getIntoDeviceMeasurementIterator();
+    for (PartialPath sourceDevice : sourceDevices) {
+      PartialPath deviceTemplate = intoDeviceMeasurementIterator.getDeviceTemplate();
+      boolean isAlignedDevice = intoDeviceMeasurementIterator.isAlignedDevice();
+      PartialPath targetDevice = constructTargetDevice(sourceDevice, deviceTemplate);
+      intoDeviceMeasurementDescriptor.specifyTargetDevice(sourceDevice, targetDevice);
+      intoDeviceMeasurementDescriptor.specifyDeviceAlignment(targetDevice, isAlignedDevice);
+
+      for (Expression sourceColumn : sourceColumns) {
+        String measurementTemplate = intoDeviceMeasurementIterator.getMeasurementTemplate();
+        String targetMeasurement;
+        if (sourceColumn instanceof TimeSeriesOperand) {
+          targetMeasurement =
+              constructTargetMeasurement(
+                  sourceDevice.concatNode(sourceColumn.toString()), measurementTemplate);
         } else {
-
+          targetMeasurement = sourceColumn.toString();
         }
-      }
-    } else {
-
-    }
-    analyzeIntoDevices(analysis, intoItems);
-    analysis.setIntoPathPatternTreeMap(intoPathPatternTreeMap);
-  }
-
-  private PartialPath constructIntoDevice(PartialPath sourceDevice, PartialPath deviceTemplate) {
-    String[] sourceNodes = sourceDevice.getNodes();
-    String[] templateNodes = deviceTemplate.getNodes();
-
-    List<String> targetNodes = new ArrayList<>();
-    for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
-      String curNode = templateNodes[nodeIndex];
-      if (curNode.equals(DOUBLE_COLONS)) {
-        if (nodeIndex != templateNodes.length - 1) {
-          throw new SemanticException("");
-        }
-        for (; nodeIndex < sourceNodes.length; nodeIndex++) {
-          targetNodes.add(sourceNodes[nodeIndex]);
-        }
-        break;
+        intoDeviceMeasurementDescriptor.specifyTargetMeasurement(
+            targetDevice, sourceColumn.toString(), targetMeasurement);
+        intoDeviceMeasurementIterator.nextMeasurement();
       }
 
-      String resNode = curNode;
-      Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
-      while (matcher.find()) {
-        String param = matcher.group();
-        int index;
-        try {
-          index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
-        } catch (NumberFormatException e) {
-          throw new SemanticException("select into: the i of ${i} should be an integer.");
-        }
-        if (index < 1 || index >= sourceNodes.length) {
-          throw new SemanticException(
-              "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
-        }
-        resNode = matcher.replaceFirst(sourceNodes[index]);
-        matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
-      }
-      targetNodes.add(resNode);
+      intoDeviceMeasurementIterator.nextDevice();
     }
-    return new PartialPath(targetNodes.toArray(new String[0]));
+    analysis.setIntoDeviceMeasurementDescriptor(intoDeviceMeasurementDescriptor);
   }
 
   private void analyzeInto(
       Analysis analysis,
       QueryStatement queryStatement,
       List<Pair<Expression, String>> outputExpressions) {
-    if (queryStatement.getIntoComponent() == null) {
+    if (!queryStatement.isSelectInto()) {
       return;
     }
 
-    List<Expression> outputColumns =
+    List<Expression> sourceColumns =
         outputExpressions.stream()
             .map(Pair::getLeft)
             .collect(Collectors.toCollection(ArrayList::new));
-    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(outputColumns);
 
     IntoComponent intoComponent = queryStatement.getIntoComponent();
-    List<IntoItem> intoItems = intoComponent.getIntoItems();
-
-    List<PartialPath> intoPaths = new ArrayList<>();
-    PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap =
-        PatternTreeMapFactory.getIntoPathPatternTreeMap();
-
-    if (isAllRawSeriesQuery) {
-      List<PartialPath> sourcePaths =
-          outputColumns.stream()
-              .map(expression -> ((TimeSeriesOperand) expression).getPath())
-              .collect(Collectors.toList());
-
-      if (intoComponent.isDeviceExistPlaceholder()) {
-        if (intoComponent.isMeasurementsExistPlaceholder()) {
-          if (intoItems.size() > 1) {
-            throw new SemanticException("");
-          }
-          if (intoItems.get(0).getIntoMeasurements().size() > 1) {
-            throw new SemanticException("");
-          }
-
-          PartialPath deviceTemplate = intoItems.get(0).getIntoDevice();
-          String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
-          for (PartialPath sourcePath : sourcePaths) {
-            intoPaths.add(constructIntoPath(sourcePath, deviceTemplate, measurementTemplate));
-          }
-        } else {
-          int sourcePathIndex = 0;
-          for (IntoItem intoItem : intoItems) {
-            PartialPath deviceTemplate = intoItem.getIntoDevice();
-            List<String> measurementList = intoItem.getIntoMeasurements();
-            for (String measurement : measurementList) {
-              intoPaths.add(
-                  constructIntoPath(sourcePaths.get(sourcePathIndex), deviceTemplate, measurement));
-              sourcePathIndex++;
-            }
-          }
-        }
+    intoComponent.validate(sourceColumns);
+
+    IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+    IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator();
+    for (Expression sourceColumn : sourceColumns) {
+      PartialPath deviceTemplate = intoPathIterator.getDeviceTemplate();
+      String measurementTemplate = intoPathIterator.getMeasurementTemplate();
+      boolean isAlignedDevice = intoPathIterator.isAlignedDevice();
+
+      PartialPath targetPath;
+      if (sourceColumn instanceof TimeSeriesOperand) {
+        PartialPath sourcePath = ((TimeSeriesOperand) sourceColumn).getPath();
+        targetPath = constructTargetPath(sourcePath, deviceTemplate, measurementTemplate);
       } else {
-        if (intoComponent.isMeasurementsExistPlaceholder()) {
-          if (intoItems.size() != outputColumns.size()) {
-            throw new SemanticException(
-                "select into: the number of source columns and the number of target paths should be the same.");
-          }
-
-          for (int i = 0; i < intoItems.size(); i++) {
-            if (intoItems.get(i).getIntoMeasurements().size() != 1) {
-              throw new SemanticException("");
-            }
-
-            PartialPath targetDevice = intoItems.get(i).getIntoDevice();
-            intoPaths.add(
-                targetDevice.concatNode(
-                    constructIntoMeasurement(
-                        sourcePaths.get(i), intoItems.get(0).getIntoMeasurements().get(0))));
-          }
-        } else {
-          intoPaths =
-              intoItems.stream()
-                  .map(IntoItem::getIntoPaths)
-                  .flatMap(List::stream)
-                  .collect(Collectors.toList());
-        }
-      }
-    } else {
-      // disable placeholder
-      for (IntoItem intoItem : intoItems) {
-        if (intoItem.isDeviceExistPlaceholder() || intoItem.isMeasurementsExistPlaceholder()) {
-          throw new SemanticException(
-              "select into: placeholders can only be used in raw timeseries data queries.");
-        }
-      }
-
-      intoPaths =
-          intoItems.stream()
-              .map(IntoItem::getIntoPaths)
-              .flatMap(List::stream)
-              .collect(Collectors.toList());
-    }
-
-    // check quantity consistency
-    if (intoPaths.size() != outputColumns.size()) {
-      throw new SemanticException(
-          "select into: the number of source columns and the number of target paths should be the same.");
-    }
-
-    for (int i = 0; i < intoPaths.size(); i++) {
-      intoPathPatternTreeMap.append(intoPaths.get(i), outputColumns.get(i).toString());
-      if (intoPathPatternTreeMap.getOverlapped(intoPaths.get(i)).size() > 1) {
-        throw new SemanticException(
-            "select into: target paths in into clause should be different.");
-      }
-    }
-
-    analyzeIntoDevices(analysis, intoItems);
-    analysis.setIntoPathPatternTreeMap(intoPathPatternTreeMap);
-  }
-
-  private void analyzeIntoDevices(Analysis analysis, List<IntoItem> intoItems) {
-    Map<PartialPath, Boolean> intoDeviceToAlignedMap = new HashMap<>();
-    for (IntoItem intoItem : intoItems) {
-      PartialPath devicePath = intoItem.getIntoDevice();
-      boolean isAligned = intoItem.isAligned();
-      if (intoDeviceToAlignedMap.containsKey(devicePath)
-          && intoDeviceToAlignedMap.get(devicePath) != isAligned) {
-        throw new SemanticException(
-            String.format(
-                "select into: inconsistent alignment property specified for device '%s'.",
-                devicePath));
-      }
-      intoDeviceToAlignedMap.put(devicePath, isAligned);
-    }
-    analysis.setIntoDeviceToAlignedMap(intoDeviceToAlignedMap);
-  }
-
-  private boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
-    for (Expression expression : expressions) {
-      if (!(expression instanceof TimeSeriesOperand)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private PartialPath constructIntoPath(
-      PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
-    PartialPath targetDevice = constructIntoDevice(sourcePath.getDevicePath(), deviceTemplate);
-    String targetMeasurement = constructIntoMeasurement(sourcePath, measurementTemplate);
-    return targetDevice.concatNode(targetMeasurement);
-  }
-
-  private String constructIntoMeasurement(PartialPath sourcePath, String measurementTemplate) {
-    if (measurementTemplate.equals(DOUBLE_COLONS)) {
-      return sourcePath.getMeasurement();
-    }
-
-    String[] sourceNodes = sourcePath.getNodes();
-    String resNode = measurementTemplate;
-    Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
-    while (matcher.find()) {
-      String param = matcher.group();
-      int index;
-      try {
-        index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
-      } catch (NumberFormatException e) {
-        throw new SemanticException("select into: the i of ${i} should be an integer.");
-      }
-      if (index < 1 || index >= sourceNodes.length) {
-        throw new SemanticException(
-            "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+        targetPath = deviceTemplate.concatNode(measurementTemplate);
       }
-      resNode = matcher.replaceFirst(sourceNodes[index]);
-      matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+      intoPathDescriptor.specifyTargetPath(sourceColumn.toString(), targetPath);
+      intoPathDescriptor.specifyDeviceAlignment(targetPath.getDevicePath(), isAlignedDevice);
+      intoPathIterator.next();
     }
-    return resNode;
+    analysis.setIntoPathDescriptor(intoPathDescriptor);
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
new file mode 100644
index 0000000000..18107823e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class SelectIntoUtils {
+
+  public static PartialPath constructTargetPath(
+      PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
+    PartialPath targetDevice = constructTargetDevice(sourcePath.getDevicePath(), deviceTemplate);
+    String targetMeasurement = constructTargetMeasurement(sourcePath, measurementTemplate);
+    return targetDevice.concatNode(targetMeasurement);
+  }
+
+  public static PartialPath constructTargetDevice(
+      PartialPath sourceDevice, PartialPath deviceTemplate) {
+    String[] sourceNodes = sourceDevice.getNodes();
+    String[] templateNodes = deviceTemplate.getNodes();
+
+    List<String> targetNodes = new ArrayList<>();
+    for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
+      String curNode = templateNodes[nodeIndex];
+      if (curNode.equals(DOUBLE_COLONS)) {
+        if (nodeIndex != templateNodes.length - 1) {
+          throw new SemanticException(
+              "select into: placeholder `::` can only be used at the end of the path.");
+        }
+        for (; nodeIndex < sourceNodes.length; nodeIndex++) {
+          targetNodes.add(sourceNodes[nodeIndex]);
+        }
+        break;
+      }
+
+      String resNode = applyLevelPlaceholder(curNode, sourceNodes);
+      targetNodes.add(resNode);
+    }
+    return new PartialPath(targetNodes.toArray(new String[0]));
+  }
+
+  public static String constructTargetMeasurement(
+      PartialPath sourcePath, String measurementTemplate) {
+    if (measurementTemplate.equals(DOUBLE_COLONS)) {
+      return sourcePath.getMeasurement();
+    }
+    return applyLevelPlaceholder(measurementTemplate, sourcePath.getNodes());
+  }
+
+  private static String applyLevelPlaceholder(String templateNode, String[] sourceNodes) {
+    String resNode = templateNode;
+    Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+    while (matcher.find()) {
+      String param = matcher.group();
+      int index;
+      try {
+        index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
+      } catch (NumberFormatException e) {
+        throw new SemanticException("select into: the i of ${i} should be an integer.");
+      }
+      if (index < 1 || index >= sourceNodes.length) {
+        throw new SemanticException(
+            "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+      }
+      resNode = matcher.replaceFirst(sourceNodes[index]);
+      matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+    }
+    return resNode;
+  }
+
+  public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
+    for (Expression expression : expressions) {
+      if (!(expression instanceof TimeSeriesOperand)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java
new file mode 100644
index 0000000000..c0a0ec120e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_DEVICE_ERROR_MSG;
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoDeviceMeasurementDescriptor extends IntoPathDescriptor {
+
+  private final Map<PartialPath, PartialPath> targetDeviceToSourceDeviceMap;
+
+  public IntoDeviceMeasurementDescriptor() {
+    super();
+    this.targetDeviceToSourceDeviceMap = new HashMap<>();
+  }
+
+  public void specifyTargetDevice(PartialPath sourceDevice, PartialPath targetDevice) {
+    if (targetDeviceToSourceDeviceMap.containsKey(targetDevice)
+        && !targetDeviceToSourceDeviceMap.get(targetDevice).equals(sourceDevice)) {
+      throw new SemanticException(DUPLICATE_TARGET_DEVICE_ERROR_MSG);
+    }
+    targetDeviceToSourceDeviceMap.put(targetDevice, sourceDevice);
+  }
+
+  public void specifyTargetMeasurement(
+      PartialPath targetDevice, String sourceColumn, String targetMeasurement) {
+    Map<String, String> measurementToSourceColumnMap =
+        targetPathToSourceMap.computeIfAbsent(targetDevice, key -> new HashMap<>());
+    if (measurementToSourceColumnMap.containsKey(targetMeasurement)) {
+      throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+    }
+    measurementToSourceColumnMap.put(targetMeasurement, sourceColumn);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
new file mode 100644
index 0000000000..d39515dcf0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoPathDescriptor {
+
+  protected final Map<PartialPath, Map<String, String>> targetPathToSourceMap;
+  protected final Map<PartialPath, Boolean> deviceToAlignedMap;
+
+  public IntoPathDescriptor() {
+    this.targetPathToSourceMap = new HashMap<>();
+    this.deviceToAlignedMap = new HashMap<>();
+  }
+
+  public void specifyTargetPath(String sourceColumn, PartialPath targetPath) {
+    PartialPath targetDevice = targetPath.getDevicePath();
+    String targetMeasurement = targetPath.getMeasurement();
+
+    Map<String, String> measurementToSourceColumnMap =
+        targetPathToSourceMap.computeIfAbsent(targetDevice, key -> new HashMap<>());
+    if (measurementToSourceColumnMap.containsKey(targetMeasurement)) {
+      throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+    }
+    measurementToSourceColumnMap.put(targetMeasurement, sourceColumn);
+  }
+
+  public void specifyDeviceAlignment(PartialPath targetDevice, boolean isAligned) {
+    if (deviceToAlignedMap.containsKey(targetDevice)
+        && deviceToAlignedMap.get(targetDevice) != isAligned) {
+      throw new SemanticException(
+          "select into: alignment property must be the same for the same device.");
+    }
+    deviceToAlignedMap.put(targetDevice, isAligned);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
index c5848a3752..223e10602f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
@@ -19,23 +19,37 @@
 
 package org.apache.iotdb.db.mpp.plan.statement.component;
 
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 
 import java.util.List;
 
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.checkIsAllRawSeriesQuery;
+
 /** This class maintains information of {@code INTO} clause. */
 public class IntoComponent extends StatementNode {
 
+  public static String PLACEHOLDER_MISMATCH_ERROR_MSG =
+      "select into: the correspondence between the placeholder and the raw time series could not be established.";
+  public static String FORBID_PLACEHOLDER_ERROR_MSG =
+      "select into: placeholders can only be used in raw time series data queries.";
+  public static String DEVICE_NUM_MISMATCH_ERROR_MSG =
+      "select into: the number of source devices and the number of target devices should be the same.";
+  public static String PATH_NUM_MISMATCH_ERROR_MSG =
+      "select into: the number of source columns and the number of target paths should be the same.";
+  public static String DUPLICATE_TARGET_DEVICE_ERROR_MSG =
+      "select into: target devices in into clause should be different.";
+  public static String DUPLICATE_TARGET_PATH_ERROR_MSG =
+      "select into: target paths in into clause should be different.";
+
   private final List<IntoItem> intoItems;
 
   public IntoComponent(List<IntoItem> intoItems) {
     this.intoItems = intoItems;
   }
 
-  public List<IntoItem> getIntoItems() {
-    return intoItems;
-  }
-
   public boolean isDeviceExistPlaceholder() {
     for (IntoItem intoItem : intoItems) {
       if (intoItem.isDeviceExistPlaceholder()) {
@@ -53,4 +67,170 @@ public class IntoComponent extends StatementNode {
     }
     return false;
   }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // used in ALIGN BY TIME
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public void validate(List<Expression> sourceColumns) {
+    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+    if (!isAllRawSeriesQuery) {
+      if (isDeviceExistPlaceholder() || isMeasurementsExistPlaceholder()) {
+        throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+      }
+    }
+
+    if (isMeasurementsExistPlaceholder()) {
+      for (IntoItem intoItem : intoItems) {
+        if (intoItem.getIntoMeasurements().size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      }
+
+      if (isDeviceExistPlaceholder()) {
+        if (intoItems.size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      } else {
+        if (intoItems.size() != sourceColumns.size()) {
+          throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+        }
+      }
+    } else {
+      int intoPathsNum =
+          intoItems.stream().mapToInt(item -> item.getIntoMeasurements().size()).sum();
+      if (intoPathsNum != sourceColumns.size()) {
+        throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+      }
+    }
+  }
+
+  public IntoPathIterator getIntoPathIterator() {
+    return new IntoPathIterator(
+        intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+  }
+
+  public static class IntoPathIterator extends AbstractIntoIterator {
+
+    public IntoPathIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+    }
+
+    public void next() {
+      if (isMeasurementsExistPlaceholder) {
+        if (!isDeviceExistPlaceholder) {
+          deviceIndex++;
+        }
+      } else {
+        measurementIndex++;
+        if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+          deviceIndex++;
+          measurementIndex = 0;
+        }
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // used in ALIGN BY DEVICE
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public void validate(List<PartialPath> sourceDevices, List<Expression> sourceColumns) {
+    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+    if (!isAllRawSeriesQuery) {
+      if (isMeasurementsExistPlaceholder()) {
+        throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+      }
+    }
+
+    if (isDeviceExistPlaceholder()) {
+      if (intoItems.size() != 1) {
+        throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+      }
+    } else {
+      if (intoItems.size() != sourceDevices.size()) {
+        throw new SemanticException(DEVICE_NUM_MISMATCH_ERROR_MSG);
+      }
+    }
+
+    for (IntoItem intoItem : intoItems) {
+      List<String> intoMeasurements = intoItem.getIntoMeasurements();
+      if (intoItem.isMeasurementsExistPlaceholder()) {
+        if (intoMeasurements.size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      } else {
+        if (intoMeasurements.size() != sourceColumns.size()) {
+          throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+        }
+      }
+    }
+  }
+
+  public IntoDeviceMeasurementIterator getIntoDeviceMeasurementIterator() {
+    return new IntoDeviceMeasurementIterator(
+        intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+  }
+
+  public static class IntoDeviceMeasurementIterator extends AbstractIntoIterator {
+
+    public IntoDeviceMeasurementIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+    }
+
+    public void nextDevice() {
+      if (!isDeviceExistPlaceholder) {
+        deviceIndex++;
+        measurementIndex = 0;
+      }
+    }
+
+    public void nextMeasurement() {
+      if (!intoItems.get(deviceIndex).isMeasurementsExistPlaceholder()) {
+        measurementIndex++;
+      }
+    }
+  }
+
+  public abstract static class AbstractIntoIterator {
+
+    protected final List<IntoItem> intoItems;
+
+    protected final boolean isDeviceExistPlaceholder;
+    protected final boolean isMeasurementsExistPlaceholder;
+
+    protected int deviceIndex;
+    protected int measurementIndex;
+
+    public AbstractIntoIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      this.intoItems = intoItems;
+      this.isDeviceExistPlaceholder = isDeviceExistPlaceholder;
+      this.isMeasurementsExistPlaceholder = isMeasurementsExistPlaceholder;
+      this.deviceIndex = 0;
+      this.measurementIndex = 0;
+    }
+
+    public PartialPath getDeviceTemplate() {
+      return intoItems.get(deviceIndex).getIntoDevice();
+    }
+
+    public String getMeasurementTemplate() {
+      return intoItems.get(deviceIndex).getIntoMeasurements().get(measurementIndex);
+    }
+
+    public boolean isAlignedDevice() {
+      return intoItems.get(deviceIndex).isAligned();
+    }
+  }
 }