You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/18 12:12:59 UTC

[iotdb] branch master updated: [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a078e88daa [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
a078e88daa is described below

commit a078e88daab0e1d0fedccd6bfe360dc1a10a2a10
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Oct 18 20:12:53 2022 +0800

    [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  14 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   1 +
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   5 +
 .../org/apache/iotdb/commons/path/PartialPath.java |  13 ++
 .../db/mpp/common/header/ColumnHeaderConstant.java |  19 ++
 .../db/mpp/common/header/DatasetHeaderFactory.java |   6 +
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  29 +++
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 113 +++++++++-
 .../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++++++
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  99 ++++++++-
 .../parameter/DeviceViewIntoPathDescriptor.java    | 208 ++++++++++++++++++
 .../planner/plan/parameter/IntoPathDescriptor.java | 167 +++++++++++++++
 .../plan/statement/component/IntoComponent.java    | 237 +++++++++++++++++++++
 .../db/mpp/plan/statement/component/IntoItem.java  |  73 +++++++
 .../db/mpp/plan/statement/crud/QueryStatement.java |  38 ++++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  34 +--
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     | 169 +++++++++++++++
 .../tsfile/common/constant/TsFileConstant.java     |   4 +
 18 files changed, 1308 insertions(+), 26 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 0bb4ad877d..26be682263 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -400,11 +400,21 @@ selectStatement
 
 intoClause
     : INTO ALIGNED? intoPath (COMMA intoPath)*
+    | INTO intoItem (COMMA intoItem)*
     ;
 
 intoPath
-    : fullPath
-    | nodeNameWithoutWildcard (DOT nodeNameWithoutWildcard)*
+    : ROOT (DOT nodeNameInIntoPath)* #fullPathInIntoPath
+    | nodeNameInIntoPath (DOT nodeNameInIntoPath)* #suffixPathInIntoPath
+    ;
+
+intoItem
+    : ALIGNED? intoPath LR_BRACKET nodeNameInIntoPath (COMMA nodeNameInIntoPath)* RR_BRACKET
+    ;
+
+nodeNameInIntoPath
+    : nodeNameWithoutWildcard
+    | DOUBLE_COLON
     ;
 
 specialClause
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index cdc57dad4d..af9c837c52 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -887,6 +887,7 @@ LR_BRACKET : '(';
 RR_BRACKET : ')';
 LS_BRACKET : '[';
 RS_BRACKET : ']';
+DOUBLE_COLON: '::';
 
 
 /**
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index b6c9cd7ed7..d5f647a82a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.conf;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 public class IoTDBConstant {
 
@@ -283,4 +284,8 @@ public class IoTDBConstant {
     V_0_12,
     V_0_13
   }
+
+  // select into
+  public static final Pattern LEVELED_PATH_TEMPLATE_PATTERN = Pattern.compile("\\$\\{\\w+}");
+  public static final String DOUBLE_COLONS = "::";
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index dc566ded43..8d47504e3d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -616,6 +616,19 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
     return true;
   }
 
+  public boolean startWith(String otherNode) {
+    return nodes[0].equals(otherNode);
+  }
+
+  public boolean containNode(String otherNode) {
+    for (String node : nodes) {
+      if (node.equals(otherNode)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public String toString() {
     return getFullPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 587491ba2c..67bd84e06b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -115,6 +115,12 @@ public class ColumnHeaderConstant {
   public static final String COLUMN_PIPE_STATUS = "status";
   public static final String COLUMN_PIPE_MESSAGE = "message";
 
+  // column names for select into
+  public static final String COLUMN_SOURCE_DEVICE = "source device";
+  public static final String COLUMN_SOURCE_COLUMN = "source column";
+  public static final String COLUMN_TARGET_TIMESERIES = "target timeseries";
+  public static final String COLUMN_WRITTEN = "written";
+
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -276,6 +282,19 @@ public class ColumnHeaderConstant {
           new ColumnHeader(COLUMN_PIPE_STATUS, TSDataType.TEXT),
           new ColumnHeader(COLUMN_PIPE_MESSAGE, TSDataType.TEXT));
 
+  public static final List<ColumnHeader> selectIntoColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(COLUMN_SOURCE_COLUMN, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_TARGET_TIMESERIES, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_WRITTEN, TSDataType.INT32));
+
+  public static final List<ColumnHeader> selectIntoAlignByDeviceColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(COLUMN_SOURCE_DEVICE, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_SOURCE_COLUMN, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_TARGET_TIMESERIES, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_WRITTEN, TSDataType.INT32));
+
   public static final List<ColumnHeader> getRoutingColumnHeaders =
       ImmutableList.of(new ColumnHeader(COLUMN_REGION_ID, TSDataType.INT32));
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 835e6df4a2..7c537f791e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -140,4 +140,10 @@ public class DatasetHeaderFactory {
   public static DatasetHeader getGetTimeSlotListHeader() {
     return new DatasetHeader(ColumnHeaderConstant.getTimeSlotListColumnHeaders, true);
   }
+
+  public static DatasetHeader getSelectIntoHeader(boolean isAlignByDevice) {
+    return isAlignByDevice
+        ? new DatasetHeader(ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders, true)
+        : new DatasetHeader(ColumnHeaderConstant.selectIntoColumnHeaders, true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 8e346a03a8..5cec1e6d04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -29,8 +29,10 @@ import org.apache.iotdb.db.mpp.common.NodeRef;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -157,6 +159,16 @@ public class Analysis {
   // header of result dataset
   private DatasetHeader respDatasetHeader;
 
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // SELECT INTO Analysis
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  // used in ALIGN BY DEVICE
+  private DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor;
+
+  // used in ALIGN BY TIME
+  private IntoPathDescriptor intoPathDescriptor;
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   // Schema Query Analysis
   /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -437,6 +449,23 @@ public class Analysis {
     this.deviceViewOutputExpressions = deviceViewOutputExpressions;
   }
 
+  public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
+    return deviceViewIntoPathDescriptor;
+  }
+
+  public void setDeviceViewIntoPathDescriptor(
+      DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+    this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
+  }
+
+  public IntoPathDescriptor getIntoPathDescriptor() {
+    return intoPathDescriptor;
+  }
+
+  public void setIntoPathDescriptor(IntoPathDescriptor intoPathDescriptor) {
+    this.intoPathDescriptor = intoPathDescriptor;
+  }
+
   public List<String> getTagKeys() {
     return tagKeys;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index db5dd1bc2c..77e086f4eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -52,14 +52,17 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -147,6 +150,9 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC
 import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
 import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetDevice;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetPath;
 
 /** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
 public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
@@ -203,6 +209,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       logger.info("[EndFetchSchema]");
       // If there is no leaf node in the schema tree, the query should be completed immediately
       if (schemaTree.isEmpty()) {
+        if (queryStatement.isSelectInto()) {
+          analysis.setRespDatasetHeader(
+              DatasetHeaderFactory.getSelectIntoHeader(queryStatement.isAlignByDevice()));
+        }
         if (queryStatement.isLastQuery()) {
           analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
         }
@@ -237,6 +247,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
 
         analyzeDeviceToSource(analysis, queryStatement);
         analyzeDeviceView(analysis, queryStatement, outputExpressions);
+
+        analyzeInto(analysis, queryStatement, deviceSet, outputExpressions);
       } else {
         outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree);
 
@@ -255,6 +267,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         analyzeSourceTransform(analysis, queryStatement);
 
         analyzeSource(analysis, queryStatement);
+
+        analyzeInto(analysis, queryStatement, outputExpressions);
       }
 
       analyzeGroupBy(analysis, queryStatement);
@@ -409,15 +423,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     // device path patterns in FROM clause
     List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths();
 
-    Set<PartialPath> deviceList = new LinkedHashSet<>();
+    Set<PartialPath> deviceSet = new LinkedHashSet<>();
     for (PartialPath devicePattern : devicePatternList) {
       // get all matched devices
-      deviceList.addAll(
+      deviceSet.addAll(
           schemaTree.getMatchedDevices(devicePattern).stream()
               .map(DeviceSchemaInfo::getDevicePath)
               .collect(Collectors.toList()));
     }
-    return deviceList;
+    return deviceSet;
   }
 
   private List<Pair<Expression, String>> analyzeSelect(
@@ -996,6 +1010,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       Analysis analysis,
       QueryStatement queryStatement,
       List<Pair<Expression, String>> outputExpressions) {
+    if (queryStatement.isSelectInto()) {
+      analysis.setRespDatasetHeader(
+          DatasetHeaderFactory.getSelectIntoHeader(queryStatement.isAlignByDevice()));
+      return;
+    }
+
     boolean isIgnoreTimestamp =
         queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime();
     List<ColumnHeader> columnHeaders = new ArrayList<>();
@@ -1072,6 +1092,93 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
   }
 
+  private void analyzeInto(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      Set<PartialPath> deviceSet,
+      List<Pair<Expression, String>> outputExpressions) {
+    if (!queryStatement.isSelectInto()) {
+      return;
+    }
+
+    List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
+    List<Expression> sourceColumns =
+        outputExpressions.stream()
+            .map(Pair::getLeft)
+            .collect(Collectors.toCollection(ArrayList::new));
+
+    IntoComponent intoComponent = queryStatement.getIntoComponent();
+    intoComponent.validate(sourceDevices, sourceColumns);
+
+    DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new DeviceViewIntoPathDescriptor();
+    IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
+        intoComponent.getIntoDeviceMeasurementIterator();
+    for (PartialPath sourceDevice : sourceDevices) {
+      PartialPath deviceTemplate = intoDeviceMeasurementIterator.getDeviceTemplate();
+      boolean isAlignedDevice = intoDeviceMeasurementIterator.isAlignedDevice();
+      PartialPath targetDevice = constructTargetDevice(sourceDevice, deviceTemplate);
+      deviceViewIntoPathDescriptor.specifyDeviceAlignment(targetDevice.toString(), isAlignedDevice);
+
+      for (Expression sourceColumn : sourceColumns) {
+        String measurementTemplate = intoDeviceMeasurementIterator.getMeasurementTemplate();
+        String targetMeasurement;
+        if (sourceColumn instanceof TimeSeriesOperand) {
+          targetMeasurement =
+              constructTargetMeasurement(
+                  sourceDevice.concatNode(sourceColumn.toString()), measurementTemplate);
+        } else {
+          targetMeasurement = measurementTemplate;
+        }
+        deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement(
+            sourceDevice, targetDevice, sourceColumn.toString(), targetMeasurement);
+        intoDeviceMeasurementIterator.nextMeasurement();
+      }
+
+      intoDeviceMeasurementIterator.nextDevice();
+    }
+    deviceViewIntoPathDescriptor.validate();
+    analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor);
+  }
+
+  private void analyzeInto(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      List<Pair<Expression, String>> outputExpressions) {
+    if (!queryStatement.isSelectInto()) {
+      return;
+    }
+
+    List<Expression> sourceColumns =
+        outputExpressions.stream()
+            .map(Pair::getLeft)
+            .collect(Collectors.toCollection(ArrayList::new));
+
+    IntoComponent intoComponent = queryStatement.getIntoComponent();
+    intoComponent.validate(sourceColumns);
+
+    IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+    IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator();
+    for (Expression sourceColumn : sourceColumns) {
+      PartialPath deviceTemplate = intoPathIterator.getDeviceTemplate();
+      String measurementTemplate = intoPathIterator.getMeasurementTemplate();
+      boolean isAlignedDevice = intoPathIterator.isAlignedDevice();
+
+      PartialPath targetPath;
+      if (sourceColumn instanceof TimeSeriesOperand) {
+        PartialPath sourcePath = ((TimeSeriesOperand) sourceColumn).getPath();
+        targetPath = constructTargetPath(sourcePath, deviceTemplate, measurementTemplate);
+      } else {
+        targetPath = deviceTemplate.concatNode(measurementTemplate);
+      }
+      intoPathDescriptor.specifyTargetPath(sourceColumn.toString(), targetPath);
+      intoPathDescriptor.specifyDeviceAlignment(
+          targetPath.getDevicePath().toString(), isAlignedDevice);
+      intoPathIterator.next();
+    }
+    intoPathDescriptor.validate();
+    analysis.setIntoPathDescriptor(intoPathDescriptor);
+  }
+
   /**
    * Check datatype consistency in ALIGN BY DEVICE.
    *
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
new file mode 100644
index 0000000000..18107823e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class SelectIntoUtils {
+
+  public static PartialPath constructTargetPath(
+      PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
+    PartialPath targetDevice = constructTargetDevice(sourcePath.getDevicePath(), deviceTemplate);
+    String targetMeasurement = constructTargetMeasurement(sourcePath, measurementTemplate);
+    return targetDevice.concatNode(targetMeasurement);
+  }
+
+  public static PartialPath constructTargetDevice(
+      PartialPath sourceDevice, PartialPath deviceTemplate) {
+    String[] sourceNodes = sourceDevice.getNodes();
+    String[] templateNodes = deviceTemplate.getNodes();
+
+    List<String> targetNodes = new ArrayList<>();
+    for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
+      String curNode = templateNodes[nodeIndex];
+      if (curNode.equals(DOUBLE_COLONS)) {
+        if (nodeIndex != templateNodes.length - 1) {
+          throw new SemanticException(
+              "select into: placeholder `::` can only be used at the end of the path.");
+        }
+        for (; nodeIndex < sourceNodes.length; nodeIndex++) {
+          targetNodes.add(sourceNodes[nodeIndex]);
+        }
+        break;
+      }
+
+      String resNode = applyLevelPlaceholder(curNode, sourceNodes);
+      targetNodes.add(resNode);
+    }
+    return new PartialPath(targetNodes.toArray(new String[0]));
+  }
+
+  public static String constructTargetMeasurement(
+      PartialPath sourcePath, String measurementTemplate) {
+    if (measurementTemplate.equals(DOUBLE_COLONS)) {
+      return sourcePath.getMeasurement();
+    }
+    return applyLevelPlaceholder(measurementTemplate, sourcePath.getNodes());
+  }
+
+  private static String applyLevelPlaceholder(String templateNode, String[] sourceNodes) {
+    String resNode = templateNode;
+    Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+    while (matcher.find()) {
+      String param = matcher.group();
+      int index;
+      try {
+        index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
+      } catch (NumberFormatException e) {
+        throw new SemanticException("select into: the i of ${i} should be an integer.");
+      }
+      if (index < 1 || index >= sourceNodes.length) {
+        throw new SemanticException(
+            "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+      }
+      resNode = matcher.replaceFirst(sourceNodes[index]);
+      matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+    }
+    return resNode;
+  }
+
+  public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
+    for (Expression expression : expressions) {
+      if (!(expression instanceof TimeSeriesOperand)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 3104265b4f..4267c29f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -72,6 +72,8 @@ import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -810,18 +812,18 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
 
   @Override
   public Statement visitSelectStatement(IoTDBSqlParser.SelectStatementContext ctx) {
-    if (ctx.intoClause() != null) {
-      throw new SemanticException(
-          "The SELECT-INTO statement is not supported in the current version.");
-    }
-
     // initialize query statement
     queryStatement = new QueryStatement();
 
-    // parser select, from
+    // parse select, from
     parseSelectClause(ctx.selectClause());
     parseFromClause(ctx.fromClause());
 
+    // parse into clause
+    if (ctx.intoClause() != null) {
+      parseIntoClause(ctx.intoClause());
+    }
+
     // parse where clause
     if (ctx.whereClause() != null) {
       WhereCondition whereCondition = parseWhereClause(ctx.whereClause());
@@ -1405,6 +1407,44 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     }
   }
 
+  // parse INTO clause
+
+  private void parseIntoClause(IoTDBSqlParser.IntoClauseContext ctx) {
+    if (ctx.intoItem().size() > 0) {
+      List<IntoItem> intoItems = new ArrayList<>();
+      for (IoTDBSqlParser.IntoItemContext intoItemContext : ctx.intoItem()) {
+        intoItems.add(parseIntoItem(intoItemContext));
+      }
+      queryStatement.setIntoComponent(new IntoComponent(intoItems));
+    } else {
+      throw new SemanticException("The syntax of SELECT INTO statement has changed from v0.14");
+    }
+  }
+
+  private IntoItem parseIntoItem(IoTDBSqlParser.IntoItemContext intoItemContext) {
+    boolean isAligned = intoItemContext.ALIGNED() != null;
+    PartialPath intoDevice = parseIntoPath(intoItemContext.intoPath());
+    List<String> intoMeasurements =
+        intoItemContext.nodeNameInIntoPath().stream()
+            .map(this::parseNodeNameInIntoPath)
+            .collect(Collectors.toList());
+    return new IntoItem(intoDevice, intoMeasurements, isAligned);
+  }
+
+  private PartialPath parseIntoPath(IoTDBSqlParser.IntoPathContext intoPathContext) {
+    if (intoPathContext instanceof IoTDBSqlParser.FullPathInIntoPathContext) {
+      return parseFullPathInIntoPath((IoTDBSqlParser.FullPathInIntoPathContext) intoPathContext);
+    } else {
+      List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNames =
+          ((IoTDBSqlParser.SuffixPathInIntoPathContext) intoPathContext).nodeNameInIntoPath();
+      String[] path = new String[nodeNames.size()];
+      for (int i = 0; i < nodeNames.size(); i++) {
+        path[i] = parseNodeNameInIntoPath(nodeNames.get(i));
+      }
+      return new PartialPath(path);
+    }
+  }
+
   // Insert Statement ========================================================================
 
   @Override
@@ -1568,6 +1608,20 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return new PartialPath(path);
   }
 
+  private PartialPath parseFullPathInIntoPath(IoTDBSqlParser.FullPathInIntoPathContext ctx) {
+    List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNames = ctx.nodeNameInIntoPath();
+    String[] path = new String[nodeNames.size() + 1];
+    int i = 0;
+    if (ctx.ROOT() != null) {
+      path[0] = ctx.ROOT().getText();
+    }
+    for (IoTDBSqlParser.NodeNameInIntoPathContext nodeName : nodeNames) {
+      i++;
+      path[i] = parseNodeNameInIntoPath(nodeName);
+    }
+    return new PartialPath(path);
+  }
+
   private PartialPath parsePrefixPath(IoTDBSqlParser.PrefixPathContext ctx) {
     List<IoTDBSqlParser.NodeNameContext> nodeNames = ctx.nodeName();
     String[] path = new String[nodeNames.size() + 1];
@@ -1599,6 +1653,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return parseNodeString(ctx.getText());
   }
 
+  private String parseNodeNameInIntoPath(IoTDBSqlParser.NodeNameInIntoPathContext ctx) {
+    return parseNodeStringInIntoPath(ctx.getText());
+  }
+
   private String parseNodeString(String nodeName) {
     if (nodeName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)
         || nodeName.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
@@ -1617,16 +1675,43 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return nodeName;
   }
 
+  private String parseNodeStringInIntoPath(String nodeName) {
+    if (nodeName.equals(IoTDBConstant.DOUBLE_COLONS)) {
+      return nodeName;
+    }
+    if (nodeName.startsWith(TsFileConstant.BACK_QUOTE_STRING)
+        && nodeName.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
+      String unWrapped = nodeName.substring(1, nodeName.length() - 1);
+      if (PathUtils.isRealNumber(unWrapped)
+          || !TsFileConstant.IDENTIFIER_PATTERN.matcher(unWrapped).matches()) {
+        return nodeName;
+      }
+      return unWrapped;
+    }
+    checkNodeNameInIntoPath(nodeName);
+    return nodeName;
+  }
+
   private void checkNodeName(String src) {
     // node name could start with * and end with *
     if (!TsFileConstant.NODE_NAME_PATTERN.matcher(src).matches()) {
-      throw new SQLParserException(
+      throw new SemanticException(
           String.format(
               "%s is illegal, unquoted node name can only consist of digits, characters and underscore, or start or end with wildcard",
               src));
     }
   }
 
+  private void checkNodeNameInIntoPath(String src) {
+    // ${} are allowed
+    if (!TsFileConstant.NODE_NAME_IN_INTO_PATH_PATTERN.matcher(src).matches()) {
+      throw new SQLParserException(
+          String.format(
+              "%s is illegal, unquoted node name in select into clause can only consist of digits, characters, $, { and }",
+              src));
+    }
+  }
+
   private void checkIdentifier(String src) {
     if (!TsFileConstant.IDENTIFIER_PATTERN.matcher(src).matches()) {
       throw new SQLParserException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
new file mode 100644
index 0000000000..080fd88b30
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class DeviceViewIntoPathDescriptor {
+
+  // device -> List<(sourceColumn, targetPath)>
+  private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
+
+  // targetDevice -> isAlignedDevice
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+
+  public DeviceViewIntoPathDescriptor() {
+    this.deviceToSourceTargetPathPairListMap = new HashMap<>();
+    this.targetDeviceToAlignedMap = new HashMap<>();
+  }
+
+  public DeviceViewIntoPathDescriptor(
+      Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+      Map<String, Boolean> targetDeviceToAlignedMap) {
+    this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+  }
+
+  public void specifyTargetDeviceMeasurement(
+      PartialPath sourceDevice,
+      PartialPath targetDevice,
+      String sourceColumn,
+      String targetMeasurement) {
+    deviceToSourceTargetPathPairListMap
+        .computeIfAbsent(sourceDevice.toString(), key -> new ArrayList<>())
+        .add(new Pair<>(sourceColumn, targetDevice.concatNode(targetMeasurement)));
+  }
+
+  public void specifyDeviceAlignment(String targetDevice, boolean isAligned) {
+    if (targetDeviceToAlignedMap.containsKey(targetDevice)
+        && targetDeviceToAlignedMap.get(targetDevice) != isAligned) {
+      throw new SemanticException(
+          "select into: alignment property must be the same for the same device.");
+    }
+    targetDeviceToAlignedMap.put(targetDevice, isAligned);
+  }
+
+  public void validate() {
+    List<PartialPath> targetPaths =
+        deviceToSourceTargetPathPairListMap.values().stream()
+            .flatMap(List::stream)
+            .map(Pair::getRight)
+            .collect(Collectors.toList());
+    if (targetPaths.size() > new HashSet<>(targetPaths).size()) {
+      throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+    }
+  }
+
+  public Map<String, List<Pair<String, PartialPath>>> getDeviceToSourceTargetPathPairListMap() {
+    return deviceToSourceTargetPathPairListMap;
+  }
+
+  public Map<String, Map<PartialPath, Map<String, String>>> getSourceDeviceToTargetPathMap() {
+    // sourceDevice -> targetPathToSourceMap (for each device)
+    //  targetPathToSourceMap: targetDevice -> { targetMeasurement -> sourceColumn }
+    Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
+        new HashMap<>();
+    for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+        deviceToSourceTargetPathPairListMap.entrySet()) {
+      String sourceDevice = sourceTargetEntry.getKey();
+      List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+
+      Map<PartialPath, Map<String, String>> targetPathToSourceMap = new HashMap<>();
+      for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+        String sourceColumn = sourceTargetPathPair.left;
+        PartialPath targetDevice = sourceTargetPathPair.right.getDevicePath();
+        String targetMeasurement = sourceTargetPathPair.right.getMeasurement();
+
+        targetPathToSourceMap
+            .computeIfAbsent(targetDevice, key -> new HashMap<>())
+            .put(targetMeasurement, sourceColumn);
+      }
+      sourceDeviceToTargetPathMap.put(sourceDevice, targetPathToSourceMap);
+    }
+    return sourceDeviceToTargetPathMap;
+  }
+
+  public Map<String, Boolean> getTargetDeviceToAlignedMap() {
+    return targetDeviceToAlignedMap;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(deviceToSourceTargetPathPairListMap.size(), byteBuffer);
+    for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+        deviceToSourceTargetPathPairListMap.entrySet()) {
+      ReadWriteIOUtils.write(sourceTargetEntry.getKey(), byteBuffer);
+
+      List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+      ReadWriteIOUtils.write(sourceTargetPathPairList.size(), byteBuffer);
+      for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+        ReadWriteIOUtils.write(sourceTargetPathPair.left, byteBuffer);
+        sourceTargetPathPair.right.serialize(byteBuffer);
+      }
+    }
+
+    ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), byteBuffer);
+    for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(deviceToSourceTargetPathPairListMap.size(), stream);
+    for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+        deviceToSourceTargetPathPairListMap.entrySet()) {
+      ReadWriteIOUtils.write(sourceTargetEntry.getKey(), stream);
+
+      List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+      ReadWriteIOUtils.write(sourceTargetPathPairList.size(), stream);
+      for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+        ReadWriteIOUtils.write(sourceTargetPathPair.left, stream);
+        sourceTargetPathPair.right.serialize(stream);
+      }
+    }
+
+    ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), stream);
+    for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+  }
+
+  public static DeviceViewIntoPathDescriptor deserialize(ByteBuffer byteBuffer) {
+    int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap =
+        new HashMap<>(mapSize);
+    for (int i = 0; i < mapSize; i++) {
+      String sourceDevice = ReadWriteIOUtils.readString(byteBuffer);
+      int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+      List<Pair<String, PartialPath>> sourceTargetPathPairList = new ArrayList<>(listSize);
+      for (int j = 0; j < listSize; j++) {
+        sourceTargetPathPairList.add(
+            new Pair<>(
+                ReadWriteIOUtils.readString(byteBuffer), PartialPath.deserialize(byteBuffer)));
+      }
+      deviceToSourceTargetPathPairListMap.put(sourceDevice, sourceTargetPathPairList);
+    }
+
+    mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<String, Boolean> targetDeviceToAlignedMap = new HashMap<>(mapSize);
+    for (int i = 0; i < mapSize; i++) {
+      targetDeviceToAlignedMap.put(
+          ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readBool(byteBuffer));
+    }
+    return new DeviceViewIntoPathDescriptor(
+        deviceToSourceTargetPathPairListMap, targetDeviceToAlignedMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DeviceViewIntoPathDescriptor that = (DeviceViewIntoPathDescriptor) o;
+    return deviceToSourceTargetPathPairListMap.equals(that.deviceToSourceTargetPathPairListMap)
+        && targetDeviceToAlignedMap.equals(that.targetDeviceToAlignedMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(deviceToSourceTargetPathPairListMap, targetDeviceToAlignedMap);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
new file mode 100644
index 0000000000..6d55cf1e2e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoPathDescriptor {
+
+  // List<(sourceColumn, targetPath)>
+  private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
+
+  // targetDevice -> isAlignedDevice
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+
+  public IntoPathDescriptor() {
+    this.sourceTargetPathPairList = new ArrayList<>();
+    this.targetDeviceToAlignedMap = new HashMap<>();
+  }
+
+  public IntoPathDescriptor(
+      List<Pair<String, PartialPath>> sourceTargetPathPairList,
+      Map<String, Boolean> targetDeviceToAlignedMap) {
+    this.sourceTargetPathPairList = sourceTargetPathPairList;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+  }
+
+  public void specifyTargetPath(String sourceColumn, PartialPath targetPath) {
+    sourceTargetPathPairList.add(new Pair<>(sourceColumn, targetPath));
+  }
+
+  public void specifyDeviceAlignment(String targetDevice, boolean isAligned) {
+    if (targetDeviceToAlignedMap.containsKey(targetDevice)
+        && targetDeviceToAlignedMap.get(targetDevice) != isAligned) {
+      throw new SemanticException(
+          "select into: alignment property must be the same for the same device.");
+    }
+    targetDeviceToAlignedMap.put(targetDevice, isAligned);
+  }
+
+  public void validate() {
+    List<PartialPath> targetPaths =
+        sourceTargetPathPairList.stream().map(Pair::getRight).collect(Collectors.toList());
+    if (targetPaths.size() > new HashSet<>(targetPaths).size()) {
+      throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+    }
+  }
+
+  public List<Pair<String, PartialPath>> getSourceTargetPathPairList() {
+    return sourceTargetPathPairList;
+  }
+
+  public Map<String, Boolean> getTargetDeviceToAlignedMap() {
+    return targetDeviceToAlignedMap;
+  }
+
+  public Map<PartialPath, Map<String, String>> getTargetPathToSourceMap() {
+    // targetDevice -> { targetMeasurement -> sourceColumn }
+    Map<PartialPath, Map<String, String>> targetPathToSourceMap = new HashMap<>();
+
+    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+      String sourceColumn = sourceTargetPathPair.left;
+      PartialPath targetDevice = sourceTargetPathPair.right.getDevicePath();
+      String targetMeasurement = sourceTargetPathPair.right.getMeasurement();
+
+      targetPathToSourceMap
+          .computeIfAbsent(targetDevice, key -> new HashMap<>())
+          .put(targetMeasurement, sourceColumn);
+    }
+    return targetPathToSourceMap;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(sourceTargetPathPairList.size(), byteBuffer);
+    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+      ReadWriteIOUtils.write(sourceTargetPathPair.left, byteBuffer);
+      sourceTargetPathPair.right.serialize(byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), byteBuffer);
+    for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(sourceTargetPathPairList.size(), stream);
+    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+      ReadWriteIOUtils.write(sourceTargetPathPair.left, stream);
+      sourceTargetPathPair.right.serialize(stream);
+    }
+
+    ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), stream);
+    for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+      ReadWriteIOUtils.write(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+  }
+
+  public static IntoPathDescriptor deserialize(ByteBuffer byteBuffer) {
+    int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Pair<String, PartialPath>> sourceTargetPathPairList = new ArrayList<>(listSize);
+    for (int i = 0; i < listSize; i++) {
+      sourceTargetPathPairList.add(
+          new Pair<>(ReadWriteIOUtils.readString(byteBuffer), PartialPath.deserialize(byteBuffer)));
+    }
+
+    int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<String, Boolean> targetDeviceToAlignedMap = new HashMap<>(mapSize);
+    for (int i = 0; i < mapSize; i++) {
+      targetDeviceToAlignedMap.put(
+          ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readBool(byteBuffer));
+    }
+    return new IntoPathDescriptor(sourceTargetPathPairList, targetDeviceToAlignedMap);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    IntoPathDescriptor that = (IntoPathDescriptor) o;
+    return sourceTargetPathPairList.equals(that.sourceTargetPathPairList)
+        && targetDeviceToAlignedMap.equals(that.targetDeviceToAlignedMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sourceTargetPathPairList, targetDeviceToAlignedMap);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
new file mode 100644
index 0000000000..240a5333e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.component;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.checkIsAllRawSeriesQuery;
+
+/** This class maintains information of {@code INTO} clause. */
+public class IntoComponent extends StatementNode {
+
+  public static String PLACEHOLDER_MISMATCH_ERROR_MSG =
+      "select into: the correspondence between the placeholder and the raw time series could not be established.";
+  public static String FORBID_PLACEHOLDER_ERROR_MSG =
+      "select into: placeholders can only be used in raw time series data queries.";
+  public static String DEVICE_NUM_MISMATCH_ERROR_MSG =
+      "select into: the number of source devices and the number of target devices should be the same.";
+  public static String PATH_NUM_MISMATCH_ERROR_MSG =
+      "select into: the number of source columns and the number of target paths should be the same.";
+  public static String DUPLICATE_TARGET_PATH_ERROR_MSG =
+      "select into: target paths in into clause should be different.";
+
+  private final List<IntoItem> intoItems;
+
+  public IntoComponent(List<IntoItem> intoItems) {
+    this.intoItems = intoItems;
+  }
+
+  public boolean isDeviceExistPlaceholder() {
+    for (IntoItem intoItem : intoItems) {
+      if (intoItem.isDeviceExistPlaceholder()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public boolean isMeasurementsExistPlaceholder() {
+    for (IntoItem intoItem : intoItems) {
+      if (intoItem.isMeasurementsExistPlaceholder()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // used in ALIGN BY TIME
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public void validate(List<Expression> sourceColumns) {
+    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+    if (!isAllRawSeriesQuery) {
+      if (isDeviceExistPlaceholder() || isMeasurementsExistPlaceholder()) {
+        throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+      }
+    }
+
+    if (isMeasurementsExistPlaceholder()) {
+      for (IntoItem intoItem : intoItems) {
+        if (intoItem.getIntoMeasurements().size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      }
+
+      if (isDeviceExistPlaceholder()) {
+        if (intoItems.size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      } else {
+        if (intoItems.size() != sourceColumns.size()) {
+          throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+        }
+      }
+    } else {
+      int intoPathsNum =
+          intoItems.stream().mapToInt(item -> item.getIntoMeasurements().size()).sum();
+      if (intoPathsNum != sourceColumns.size()) {
+        throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+      }
+    }
+  }
+
+  public IntoPathIterator getIntoPathIterator() {
+    return new IntoPathIterator(
+        intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+  }
+
+  public static class IntoPathIterator extends AbstractIntoIterator {
+
+    public IntoPathIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+    }
+
+    public void next() {
+      if (isMeasurementsExistPlaceholder) {
+        if (!isDeviceExistPlaceholder) {
+          deviceIndex++;
+        }
+      } else {
+        measurementIndex++;
+        if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+          deviceIndex++;
+          measurementIndex = 0;
+        }
+      }
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+  // used in ALIGN BY DEVICE
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public void validate(List<PartialPath> sourceDevices, List<Expression> sourceColumns) {
+    boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+    if (!isAllRawSeriesQuery) {
+      if (isMeasurementsExistPlaceholder()) {
+        throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+      }
+    }
+
+    if (isDeviceExistPlaceholder()) {
+      if (intoItems.size() != 1) {
+        throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+      }
+    } else {
+      if (intoItems.size() != sourceDevices.size()) {
+        throw new SemanticException(DEVICE_NUM_MISMATCH_ERROR_MSG);
+      }
+    }
+
+    for (IntoItem intoItem : intoItems) {
+      List<String> intoMeasurements = intoItem.getIntoMeasurements();
+      if (intoItem.isMeasurementsExistPlaceholder()) {
+        if (intoMeasurements.size() != 1) {
+          throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+        }
+      } else {
+        if (intoMeasurements.size() != sourceColumns.size()) {
+          throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+        }
+      }
+    }
+  }
+
+  public IntoDeviceMeasurementIterator getIntoDeviceMeasurementIterator() {
+    return new IntoDeviceMeasurementIterator(
+        intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+  }
+
+  public static class IntoDeviceMeasurementIterator extends AbstractIntoIterator {
+
+    public IntoDeviceMeasurementIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+    }
+
+    public void nextDevice() {
+      if (!isDeviceExistPlaceholder) {
+        deviceIndex++;
+        measurementIndex = 0;
+      }
+    }
+
+    public void nextMeasurement() {
+      if (!intoItems.get(deviceIndex).isMeasurementsExistPlaceholder()) {
+        measurementIndex++;
+        if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+          measurementIndex = 0;
+        }
+      }
+    }
+  }
+
+  public abstract static class AbstractIntoIterator {
+
+    protected final List<IntoItem> intoItems;
+
+    protected final boolean isDeviceExistPlaceholder;
+    protected final boolean isMeasurementsExistPlaceholder;
+
+    protected int deviceIndex;
+    protected int measurementIndex;
+
+    public AbstractIntoIterator(
+        List<IntoItem> intoItems,
+        boolean isDeviceExistPlaceholder,
+        boolean isMeasurementsExistPlaceholder) {
+      this.intoItems = intoItems;
+      this.isDeviceExistPlaceholder = isDeviceExistPlaceholder;
+      this.isMeasurementsExistPlaceholder = isMeasurementsExistPlaceholder;
+      this.deviceIndex = 0;
+      this.measurementIndex = 0;
+    }
+
+    public PartialPath getDeviceTemplate() {
+      return intoItems.get(deviceIndex).getIntoDevice();
+    }
+
+    public String getMeasurementTemplate() {
+      return intoItems.get(deviceIndex).getIntoMeasurements().get(measurementIndex);
+    }
+
+    public boolean isAlignedDevice() {
+      return intoItems.get(deviceIndex).isAligned();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java
new file mode 100644
index 0000000000..4efafe3cef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.component;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class IntoItem extends StatementNode {
+
+  private final PartialPath intoDevice;
+  private final List<String> intoMeasurements;
+  private final boolean isAligned;
+
+  public IntoItem(PartialPath intoDevice, List<String> intoMeasurements, boolean isAligned) {
+    this.intoDevice = intoDevice;
+    this.intoMeasurements = intoMeasurements;
+    this.isAligned = isAligned;
+  }
+
+  public PartialPath getIntoDevice() {
+    return intoDevice;
+  }
+
+  public List<String> getIntoMeasurements() {
+    return intoMeasurements;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  public boolean isDeviceExistPlaceholder() {
+    return intoDevice.containNode(DOUBLE_COLONS)
+        || LEVELED_PATH_TEMPLATE_PATTERN.matcher(intoDevice.getFullPath()).find();
+  }
+
+  public boolean isMeasurementsExistPlaceholder() {
+    for (String measurement : intoMeasurements) {
+      if (measurement.equals(DOUBLE_COLONS)
+          || LEVELED_PATH_TEMPLATE_PATTERN.matcher(measurement).find()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public List<PartialPath> getIntoPaths() {
+    return intoMeasurements.stream().map(intoDevice::concatNode).collect(Collectors.toList());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index a4011d0ed4..ae6586de5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -98,6 +99,9 @@ public class QueryStatement extends Statement {
   // `GROUP BY TAG` clause
   protected GroupByTagComponent groupByTagComponent;
 
+  // `INTO` clause
+  protected IntoComponent intoComponent;
+
   public QueryStatement() {
     this.statementType = StatementType.QUERY;
   }
@@ -254,6 +258,10 @@ public class QueryStatement extends Statement {
     return groupByTimeComponent != null;
   }
 
+  public boolean isAlignByTime() {
+    return resultSetFormat == ResultSetFormat.ALIGN_BY_TIME;
+  }
+
   public boolean isAlignByDevice() {
     return resultSetFormat == ResultSetFormat.ALIGN_BY_DEVICE;
   }
@@ -274,6 +282,14 @@ public class QueryStatement extends Statement {
     return orderByComponent != null && orderByComponent.isOrderByDevice();
   }
 
+  public IntoComponent getIntoComponent() {
+    return intoComponent;
+  }
+
+  public void setIntoComponent(IntoComponent intoComponent) {
+    this.intoComponent = intoComponent;
+  }
+
   public Ordering getResultTimeOrder() {
     if (orderByComponent == null || !orderByComponent.isOrderByTime()) {
       return Ordering.ASC;
@@ -288,6 +304,10 @@ public class QueryStatement extends Statement {
     return orderByComponent.getSortItemList();
   }
 
+  public boolean isSelectInto() {
+    return intoComponent != null;
+  }
+
   public void semanticCheck() {
     if (isAggregationQuery()) {
       if (disableAlign()) {
@@ -385,6 +405,24 @@ public class QueryStatement extends Statement {
             "Sorting by device is only supported in ALIGN BY DEVICE queries.");
       }
     }
+
+    if (isSelectInto()) {
+      if (getSeriesLimit() > 0) {
+        throw new SemanticException("select into: slimit clauses are not supported.");
+      }
+      if (getSeriesOffset() > 0) {
+        throw new SemanticException("select into: soffset clauses are not supported.");
+      }
+      if (disableAlign()) {
+        throw new SemanticException("select into: disable align clauses are not supported.");
+      }
+      if (isLastQuery()) {
+        throw new SemanticException("select into: last clauses are not supported.");
+      }
+      if (isGroupByTag()) {
+        throw new SemanticException("select into: GROUP BY TAGS clause are not supported.");
+      }
+    }
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index f8dfc07c97..d6fa5100e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1271,25 +1271,31 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
 
   private SelectIntoOperator parseAndConstructSelectIntoOperator(
       IoTDBSqlParser.SelectStatementContext ctx) {
+    IoTDBSqlParser.IntoClauseContext intoClauseContext = ctx.intoClause();
+    if (intoClauseContext.intoItem().size() > 0) {
+      throw new SQLParserException(
+          "The new syntax of SELECT INTO statement will be supported starting from v0.14");
+    }
+
     if (queryOp.getFromComponent().getPrefixPaths().size() != 1) {
       throw new SQLParserException(
           "select into: the number of prefix paths in the from clause should be 1.");
     }
-
     int sourcePathsCount = queryOp.getSelectComponent().getResultColumns().size();
-    if (sourcePathsCount != ctx.intoClause().intoPath().size()) {
+    if (sourcePathsCount != intoClauseContext.intoPath().size()) {
       throw new SQLParserException(
           "select into: the number of source paths and the number of target paths should be the same.");
     }
 
     SelectIntoOperator selectIntoOperator = new SelectIntoOperator();
     selectIntoOperator.setQueryOperator(queryOp);
+
     List<PartialPath> intoPaths = new ArrayList<>();
     for (int i = 0; i < sourcePathsCount; ++i) {
-      intoPaths.add(parseIntoPath(ctx.intoClause().intoPath(i)));
+      intoPaths.add(parseIntoPath(intoClauseContext.intoPath(i)));
     }
     selectIntoOperator.setIntoPaths(intoPaths);
-    selectIntoOperator.setIntoPathsAligned(ctx.intoClause().ALIGNED() != null);
+    selectIntoOperator.setIntoPathsAligned(intoClauseContext.ALIGNED() != null);
     return selectIntoOperator;
   }
 
@@ -1304,8 +1310,9 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
     }
 
     PartialPath intoPath = null;
-    if (intoPathContext.fullPath() != null) {
-      intoPath = parseFullPathInSelectInto(intoPathContext.fullPath());
+    if (intoPathContext instanceof IoTDBSqlParser.FullPathInIntoPathContext) {
+      intoPath =
+          parseFullPathInSelectInto((IoTDBSqlParser.FullPathInIntoPathContext) intoPathContext);
 
       Matcher m = leveledPathNodePattern.matcher(intoPath.getFullPath());
       while (m.find()) {
@@ -1321,9 +1328,9 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
               "the x of ${x} should be greater than 0 and equal to or less than <level> or the length of queried path prefix.");
         }
       }
-    } else if (intoPathContext.nodeNameWithoutWildcard() != null) {
-      List<IoTDBSqlParser.NodeNameWithoutWildcardContext> nodeNameWithoutStars =
-          intoPathContext.nodeNameWithoutWildcard();
+    } else if (intoPathContext instanceof IoTDBSqlParser.SuffixPathInIntoPathContext) {
+      List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNameWithoutStars =
+          ((IoTDBSqlParser.SuffixPathInIntoPathContext) intoPathContext).nodeNameInIntoPath();
       String[] intoPathNodes =
           new String[1 + levelLimitOfSourcePrefixPath + nodeNameWithoutStars.size()];
 
@@ -2565,15 +2572,14 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
     return new PartialPath(path);
   }
 
-  private PartialPath parseFullPathInSelectInto(IoTDBSqlParser.FullPathContext ctx) {
-    List<IoTDBSqlParser.NodeNameWithoutWildcardContext> nodeNamesWithoutStar =
-        ctx.nodeNameWithoutWildcard();
+  private PartialPath parseFullPathInSelectInto(IoTDBSqlParser.FullPathInIntoPathContext ctx) {
+    List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNamesWithoutStar = ctx.nodeNameInIntoPath();
     String[] path = new String[nodeNamesWithoutStar.size() + 1];
     int i = 0;
     if (ctx.ROOT() != null) {
       path[0] = ctx.ROOT().getText();
     }
-    for (IoTDBSqlParser.NodeNameWithoutWildcardContext nodeNameWithoutStar : nodeNamesWithoutStar) {
+    for (IoTDBSqlParser.NodeNameInIntoPathContext nodeNameWithoutStar : nodeNamesWithoutStar) {
       i++;
       path[i] = parseNodeNameWithoutWildCardInSelectInto(nodeNameWithoutStar);
     }
@@ -2642,7 +2648,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
 
   /** in select into, $ and {} are allowed */
   private String parseNodeNameWithoutWildCardInSelectInto(
-      IoTDBSqlParser.NodeNameWithoutWildcardContext ctx) {
+      IoTDBSqlParser.NodeNameInIntoPathContext ctx) {
     String nodeName = ctx.getText();
     if (nodeName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)
         || nodeName.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index 02d4643fe5..1f51560c06 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.ratis.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
@@ -51,9 +52,13 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.time.ZonedDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
 import static org.junit.Assert.assertEquals;
@@ -585,6 +590,170 @@ public class AnalyzeTest {
         1);
   }
 
+  @Test
+  public void testSelectIntoPath() throws IllegalPathException {
+    List<String> sqls =
+        Arrays.asList(
+            "SELECT s1, s2 INTO root.sg_copy.d1(t1, t2), root.sg_copy.d2(t1, t2) FROM root.sg.d1, root.sg.d2;",
+            "SELECT s1, s2 INTO root.sg_copy.d1(t1, t2, t3, t4) FROM root.sg.d1, root.sg.d2;",
+            "SELECT s1, s2 INTO root.sg_copy.d1(t1), root.sg_copy.d2(t1, t2), root.sg_copy.d3(t1) FROM root.sg.d1, root.sg.d2;",
+            "select count(s1 + s2), last_value(s2) into root.agg.count(s1_add_s2), root.agg.last_value(s2) from root.sg.d1 group by ([0, 100), 10ms);",
+            "select s1 + s2 into root.expr.add(d1s1_d1s2, d1s1_d2s2, d2s1_d1s2, d2s1_d2s2) from root.sg.d1, root.sg.d2;",
+            "select s1, s1 into root.sg_copy.d1(s1, s2)  from root.sg.d1",
+            "select s1, s1 into root.sg_copy.d1(s1), root.sg_copy.d2(s1)  from root.sg.d1",
+            "select s1, s2 into ::(t1, t1, t2, t2) from root.sg.*;",
+            "select s1, s2 into root.sg_copy.::(::) from root.sg.*;",
+            "select s1, s2 into root.sg_copy.d1_copy(${2}_${3}), root.sg_copy.d1_copy(${2}_${3}), root.sg_copy.d2_copy(${2}_${3}), root.sg_copy.d2_copy(${2}_${3}) from root.sg.d1, root.sg.d2;",
+            "select d1.s1, d1.s2, d2.s1, d2.s2 into ::(s1_1, s2_2), root.sg.d2_2(s3_3), root.backup_${1}.::(s4) from root.sg");
+    List<List<Pair<String, PartialPath>>> results =
+        Arrays.asList(
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1.t2")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2.t1")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2.t2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1.t2")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d1.t3")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d1.t4"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d2.t1")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2.t2")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d3.t1"))),
+            Arrays.asList(
+                new Pair<>(
+                    "count(root.sg.d1.s1 + root.sg.d1.s2)",
+                    new PartialPath("root.agg.count.s1_add_s2")),
+                new Pair<>("last_value(root.sg.d1.s2)", new PartialPath("root.agg.last_value.s2"))),
+            Arrays.asList(
+                new Pair(
+                    "root.sg.d1.s1 + root.sg.d1.s2", new PartialPath("root.expr.add.d1s1_d1s2")),
+                new Pair(
+                    "root.sg.d1.s1 + root.sg.d2.s2", new PartialPath("root.expr.add.d1s1_d2s2")),
+                new Pair(
+                    "root.sg.d2.s1 + root.sg.d1.s2", new PartialPath("root.expr.add.d2s1_d1s2")),
+                new Pair(
+                    "root.sg.d2.s1 + root.sg.d2.s2", new PartialPath("root.expr.add.d2s1_d2s2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d2.s1"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg.d1.t1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg.d2.t1")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg.d1.t2")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg.d2.t2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d2.s1")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d1.s2")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2.s2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1_copy.d1_s1")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1_copy.d2_s1")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2_copy.d1_s2")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2_copy.d2_s2"))),
+            Arrays.asList(
+                new Pair("root.sg.d1.s1", new PartialPath("root.sg.d1.s1_1")),
+                new Pair("root.sg.d1.s2", new PartialPath("root.sg.d1.s2_2")),
+                new Pair("root.sg.d2.s1", new PartialPath("root.sg.d2_2.s3_3")),
+                new Pair("root.sg.d2.s2", new PartialPath("root.backup_sg.d2.s4"))));
+
+    for (int i = 0; i < sqls.size(); i++) {
+      Analysis analysis = analyzeSQL(sqls.get(i));
+      assert analysis != null;
+      Assert.assertEquals(
+          results.get(i), analysis.getIntoPathDescriptor().getSourceTargetPathPairList());
+    }
+  }
+
+  @Test
+  public void testSelectIntoPathAlignByDevice() throws IllegalPathException {
+    List<String> sqls =
+        Arrays.asList(
+            "select s1, s2 into root.sg_copy.::(t1, t2) from root.sg.d1, root.sg.d2 align by device;",
+            "select s1 + s2 into root.expr.add(d1s1_d1s2), root.expr.add(d2s1_d2s2) from root.sg.d1, root.sg.d2 align by device;",
+            "select count(s1), last_value(s2) into root.agg.::(count_s1, last_value_s2) from root.sg.d1, root.sg.d2 group by ([0, 100), 10ms) align by device;",
+            "select s1, s2 into root.sg1.new_d1(::), root.sg2.new_d2(::) from root.sg.d1, root.sg.d2 align by device;",
+            "select s1, s2 into root.sg1.new_${2}(::) from root.sg.d1, root.sg.d2 align by device;");
+
+    List<Map<String, List<Pair<String, PartialPath>>>> results = new ArrayList<>();
+    Map<String, List<Pair<String, PartialPath>>> resultMap1 = new HashMap<>();
+    resultMap1.put(
+        "root.sg.d1",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg_copy.d1.t1")),
+            new Pair<>("s2", new PartialPath("root.sg_copy.d1.t2"))));
+    resultMap1.put(
+        "root.sg.d2",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg_copy.d2.t1")),
+            new Pair<>("s2", new PartialPath("root.sg_copy.d2.t2"))));
+    results.add(resultMap1);
+
+    Map<String, List<Pair<String, PartialPath>>> resultMap2 = new HashMap<>();
+    resultMap2.put(
+        "root.sg.d1",
+        Collections.singletonList(
+            new Pair<>("s1 + s2", new PartialPath("root.expr.add.d1s1_d1s2"))));
+    resultMap2.put(
+        "root.sg.d2",
+        Collections.singletonList(
+            new Pair<>("s1 + s2", new PartialPath("root.expr.add.d2s1_d2s2"))));
+    results.add(resultMap2);
+
+    Map<String, List<Pair<String, PartialPath>>> resultMap3 = new HashMap<>();
+    resultMap3.put(
+        "root.sg.d1",
+        Arrays.asList(
+            new Pair<>("count(s1)", new PartialPath("root.agg.d1.count_s1")),
+            new Pair<>("last_value(s2)", new PartialPath("root.agg.d1.last_value_s2"))));
+    resultMap3.put(
+        "root.sg.d2",
+        Arrays.asList(
+            new Pair<>("count(s1)", new PartialPath("root.agg.d2.count_s1")),
+            new Pair<>("last_value(s2)", new PartialPath("root.agg.d2.last_value_s2"))));
+    results.add(resultMap3);
+
+    Map<String, List<Pair<String, PartialPath>>> resultMap4 = new HashMap<>();
+    resultMap4.put(
+        "root.sg.d1",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg1.new_d1.s1")),
+            new Pair<>("s2", new PartialPath("root.sg1.new_d1.s2"))));
+    resultMap4.put(
+        "root.sg.d2",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg2.new_d2.s1")),
+            new Pair<>("s2", new PartialPath("root.sg2.new_d2.s2"))));
+    results.add(resultMap4);
+
+    Map<String, List<Pair<String, PartialPath>>> resultMap5 = new HashMap<>();
+    resultMap5.put(
+        "root.sg.d1",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg1.new_d1.s1")),
+            new Pair<>("s2", new PartialPath("root.sg1.new_d1.s2"))));
+    resultMap5.put(
+        "root.sg.d2",
+        Arrays.asList(
+            new Pair<>("s1", new PartialPath("root.sg1.new_d2.s1")),
+            new Pair<>("s2", new PartialPath("root.sg1.new_d2.s2"))));
+    results.add(resultMap5);
+
+    for (int i = 0; i < sqls.size(); i++) {
+      Analysis analysis = analyzeSQL(sqls.get(i));
+      assert analysis != null;
+      Assert.assertEquals(
+          results.get(i),
+          analysis.getDeviceViewIntoPathDescriptor().getDeviceToSourceTargetPathPairListMap());
+    }
+  }
+
   private Analysis analyzeSQL(String sql) {
     try {
       Statement statement =
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
index 86ea19abcd..956a34a953 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
@@ -44,5 +44,9 @@ public class TsFileConstant {
   private static final String NODE_NAME_MATCHER = "(\\*{0,2}[a-zA-Z0-9_\\u2E80-\\u9FFF]+\\*{0,2})";
   public static final Pattern NODE_NAME_PATTERN = Pattern.compile(NODE_NAME_MATCHER);
 
+  private static final String NODE_NAME_IN_INTO_PATH_MATCHER = "([a-zA-Z0-9_${}\\u2E80-\\u9FFF]+)";
+  public static final Pattern NODE_NAME_IN_INTO_PATH_PATTERN =
+      Pattern.compile(NODE_NAME_IN_INTO_PATH_MATCHER);
+
   private TsFileConstant() {}
 }