You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/10/18 12:12:59 UTC
[iotdb] branch master updated: [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a078e88daa [IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
a078e88daa is described below
commit a078e88daab0e1d0fedccd6bfe360dc1a10a2a10
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Tue Oct 18 20:12:53 2022 +0800
[IOTDB-4256] Implement analyzer for SELECT INTO statement (#7588)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 14 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 1 +
.../apache/iotdb/commons/conf/IoTDBConstant.java | 5 +
.../org/apache/iotdb/commons/path/PartialPath.java | 13 ++
.../db/mpp/common/header/ColumnHeaderConstant.java | 19 ++
.../db/mpp/common/header/DatasetHeaderFactory.java | 6 +
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 29 +++
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 113 +++++++++-
.../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 +++++++++
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 99 ++++++++-
.../parameter/DeviceViewIntoPathDescriptor.java | 208 ++++++++++++++++++
.../planner/plan/parameter/IntoPathDescriptor.java | 167 +++++++++++++++
.../plan/statement/component/IntoComponent.java | 237 +++++++++++++++++++++
.../db/mpp/plan/statement/component/IntoItem.java | 73 +++++++
.../db/mpp/plan/statement/crud/QueryStatement.java | 38 ++++
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 34 +--
.../iotdb/db/mpp/plan/analyze/AnalyzeTest.java | 169 +++++++++++++++
.../tsfile/common/constant/TsFileConstant.java | 4 +
18 files changed, 1308 insertions(+), 26 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 0bb4ad877d..26be682263 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -400,11 +400,21 @@ selectStatement
intoClause
: INTO ALIGNED? intoPath (COMMA intoPath)*
+ | INTO intoItem (COMMA intoItem)*
;
intoPath
- : fullPath
- | nodeNameWithoutWildcard (DOT nodeNameWithoutWildcard)*
+ : ROOT (DOT nodeNameInIntoPath)* #fullPathInIntoPath
+ | nodeNameInIntoPath (DOT nodeNameInIntoPath)* #suffixPathInIntoPath
+ ;
+
+intoItem
+ : ALIGNED? intoPath LR_BRACKET nodeNameInIntoPath (COMMA nodeNameInIntoPath)* RR_BRACKET
+ ;
+
+nodeNameInIntoPath
+ : nodeNameWithoutWildcard
+ | DOUBLE_COLON
;
specialClause
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index cdc57dad4d..af9c837c52 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -887,6 +887,7 @@ LR_BRACKET : '(';
RR_BRACKET : ')';
LS_BRACKET : '[';
RS_BRACKET : ']';
+DOUBLE_COLON: '::';
/**
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index b6c9cd7ed7..d5f647a82a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.conf;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
public class IoTDBConstant {
@@ -283,4 +284,8 @@ public class IoTDBConstant {
V_0_12,
V_0_13
}
+
+ // select into
+ public static final Pattern LEVELED_PATH_TEMPLATE_PATTERN = Pattern.compile("\\$\\{\\w+}");
+ public static final String DOUBLE_COLONS = "::";
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index dc566ded43..8d47504e3d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -616,6 +616,19 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable {
return true;
}
+ public boolean startWith(String otherNode) {
+ return nodes[0].equals(otherNode);
+ }
+
+ public boolean containNode(String otherNode) {
+ for (String node : nodes) {
+ if (node.equals(otherNode)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public String toString() {
return getFullPath();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 587491ba2c..67bd84e06b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -115,6 +115,12 @@ public class ColumnHeaderConstant {
public static final String COLUMN_PIPE_STATUS = "status";
public static final String COLUMN_PIPE_MESSAGE = "message";
+ // column names for select into
+ public static final String COLUMN_SOURCE_DEVICE = "source device";
+ public static final String COLUMN_SOURCE_COLUMN = "source column";
+ public static final String COLUMN_TARGET_TIMESERIES = "target timeseries";
+ public static final String COLUMN_WRITTEN = "written";
+
public static final List<ColumnHeader> lastQueryColumnHeaders =
ImmutableList.of(
new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -276,6 +282,19 @@ public class ColumnHeaderConstant {
new ColumnHeader(COLUMN_PIPE_STATUS, TSDataType.TEXT),
new ColumnHeader(COLUMN_PIPE_MESSAGE, TSDataType.TEXT));
+ public static final List<ColumnHeader> selectIntoColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(COLUMN_SOURCE_COLUMN, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_TARGET_TIMESERIES, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_WRITTEN, TSDataType.INT32));
+
+ public static final List<ColumnHeader> selectIntoAlignByDeviceColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(COLUMN_SOURCE_DEVICE, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_SOURCE_COLUMN, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_TARGET_TIMESERIES, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_WRITTEN, TSDataType.INT32));
+
public static final List<ColumnHeader> getRoutingColumnHeaders =
ImmutableList.of(new ColumnHeader(COLUMN_REGION_ID, TSDataType.INT32));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index 835e6df4a2..7c537f791e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -140,4 +140,10 @@ public class DatasetHeaderFactory {
public static DatasetHeader getGetTimeSlotListHeader() {
return new DatasetHeader(ColumnHeaderConstant.getTimeSlotListColumnHeaders, true);
}
+
+ public static DatasetHeader getSelectIntoHeader(boolean isAlignByDevice) {
+ return isAlignByDevice
+ ? new DatasetHeader(ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders, true)
+ : new DatasetHeader(ColumnHeaderConstant.selectIntoColumnHeaders, true);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 8e346a03a8..5cec1e6d04 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -29,8 +29,10 @@ import org.apache.iotdb.db.mpp.common.NodeRef;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -157,6 +159,16 @@ public class Analysis {
// header of result dataset
private DatasetHeader respDatasetHeader;
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // SELECT INTO Analysis
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ // used in ALIGN BY DEVICE
+ private DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor;
+
+ // used in ALIGN BY TIME
+ private IntoPathDescriptor intoPathDescriptor;
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Schema Query Analysis
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -437,6 +449,23 @@ public class Analysis {
this.deviceViewOutputExpressions = deviceViewOutputExpressions;
}
+ public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
+ return deviceViewIntoPathDescriptor;
+ }
+
+ public void setDeviceViewIntoPathDescriptor(
+ DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor) {
+ this.deviceViewIntoPathDescriptor = deviceViewIntoPathDescriptor;
+ }
+
+ public IntoPathDescriptor getIntoPathDescriptor() {
+ return intoPathDescriptor;
+ }
+
+ public void setIntoPathDescriptor(IntoPathDescriptor intoPathDescriptor) {
+ this.intoPathDescriptor = intoPathDescriptor;
+ }
+
public List<String> getTagKeys() {
return tagKeys;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index db5dd1bc2c..77e086f4eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -52,14 +52,17 @@ import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -147,6 +150,9 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDC
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetDevice;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetPath;
/** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
@@ -203,6 +209,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
logger.info("[EndFetchSchema]");
// If there is no leaf node in the schema tree, the query should be completed immediately
if (schemaTree.isEmpty()) {
+ if (queryStatement.isSelectInto()) {
+ analysis.setRespDatasetHeader(
+ DatasetHeaderFactory.getSelectIntoHeader(queryStatement.isAlignByDevice()));
+ }
if (queryStatement.isLastQuery()) {
analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader());
}
@@ -237,6 +247,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceToSource(analysis, queryStatement);
analyzeDeviceView(analysis, queryStatement, outputExpressions);
+
+ analyzeInto(analysis, queryStatement, deviceSet, outputExpressions);
} else {
outputExpressions = analyzeSelect(analysis, queryStatement, schemaTree);
@@ -255,6 +267,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
analyzeSourceTransform(analysis, queryStatement);
analyzeSource(analysis, queryStatement);
+
+ analyzeInto(analysis, queryStatement, outputExpressions);
}
analyzeGroupBy(analysis, queryStatement);
@@ -409,15 +423,15 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
// device path patterns in FROM clause
List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths();
- Set<PartialPath> deviceList = new LinkedHashSet<>();
+ Set<PartialPath> deviceSet = new LinkedHashSet<>();
for (PartialPath devicePattern : devicePatternList) {
// get all matched devices
- deviceList.addAll(
+ deviceSet.addAll(
schemaTree.getMatchedDevices(devicePattern).stream()
.map(DeviceSchemaInfo::getDevicePath)
.collect(Collectors.toList()));
}
- return deviceList;
+ return deviceSet;
}
private List<Pair<Expression, String>> analyzeSelect(
@@ -996,6 +1010,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
List<Pair<Expression, String>> outputExpressions) {
+ if (queryStatement.isSelectInto()) {
+ analysis.setRespDatasetHeader(
+ DatasetHeaderFactory.getSelectIntoHeader(queryStatement.isAlignByDevice()));
+ return;
+ }
+
boolean isIgnoreTimestamp =
queryStatement.isAggregationQuery() && !queryStatement.isGroupByTime();
List<ColumnHeader> columnHeaders = new ArrayList<>();
@@ -1072,6 +1092,93 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
}
+ private void analyzeInto(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ Set<PartialPath> deviceSet,
+ List<Pair<Expression, String>> outputExpressions) {
+ if (!queryStatement.isSelectInto()) {
+ return;
+ }
+
+ List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
+ List<Expression> sourceColumns =
+ outputExpressions.stream()
+ .map(Pair::getLeft)
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ IntoComponent intoComponent = queryStatement.getIntoComponent();
+ intoComponent.validate(sourceDevices, sourceColumns);
+
+ DeviceViewIntoPathDescriptor deviceViewIntoPathDescriptor = new DeviceViewIntoPathDescriptor();
+ IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
+ intoComponent.getIntoDeviceMeasurementIterator();
+ for (PartialPath sourceDevice : sourceDevices) {
+ PartialPath deviceTemplate = intoDeviceMeasurementIterator.getDeviceTemplate();
+ boolean isAlignedDevice = intoDeviceMeasurementIterator.isAlignedDevice();
+ PartialPath targetDevice = constructTargetDevice(sourceDevice, deviceTemplate);
+ deviceViewIntoPathDescriptor.specifyDeviceAlignment(targetDevice.toString(), isAlignedDevice);
+
+ for (Expression sourceColumn : sourceColumns) {
+ String measurementTemplate = intoDeviceMeasurementIterator.getMeasurementTemplate();
+ String targetMeasurement;
+ if (sourceColumn instanceof TimeSeriesOperand) {
+ targetMeasurement =
+ constructTargetMeasurement(
+ sourceDevice.concatNode(sourceColumn.toString()), measurementTemplate);
+ } else {
+ targetMeasurement = measurementTemplate;
+ }
+ deviceViewIntoPathDescriptor.specifyTargetDeviceMeasurement(
+ sourceDevice, targetDevice, sourceColumn.toString(), targetMeasurement);
+ intoDeviceMeasurementIterator.nextMeasurement();
+ }
+
+ intoDeviceMeasurementIterator.nextDevice();
+ }
+ deviceViewIntoPathDescriptor.validate();
+ analysis.setDeviceViewIntoPathDescriptor(deviceViewIntoPathDescriptor);
+ }
+
+ private void analyzeInto(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ List<Pair<Expression, String>> outputExpressions) {
+ if (!queryStatement.isSelectInto()) {
+ return;
+ }
+
+ List<Expression> sourceColumns =
+ outputExpressions.stream()
+ .map(Pair::getLeft)
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ IntoComponent intoComponent = queryStatement.getIntoComponent();
+ intoComponent.validate(sourceColumns);
+
+ IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+ IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator();
+ for (Expression sourceColumn : sourceColumns) {
+ PartialPath deviceTemplate = intoPathIterator.getDeviceTemplate();
+ String measurementTemplate = intoPathIterator.getMeasurementTemplate();
+ boolean isAlignedDevice = intoPathIterator.isAlignedDevice();
+
+ PartialPath targetPath;
+ if (sourceColumn instanceof TimeSeriesOperand) {
+ PartialPath sourcePath = ((TimeSeriesOperand) sourceColumn).getPath();
+ targetPath = constructTargetPath(sourcePath, deviceTemplate, measurementTemplate);
+ } else {
+ targetPath = deviceTemplate.concatNode(measurementTemplate);
+ }
+ intoPathDescriptor.specifyTargetPath(sourceColumn.toString(), targetPath);
+ intoPathDescriptor.specifyDeviceAlignment(
+ targetPath.getDevicePath().toString(), isAlignedDevice);
+ intoPathIterator.next();
+ }
+ intoPathDescriptor.validate();
+ analysis.setIntoPathDescriptor(intoPathDescriptor);
+ }
+
/**
* Check datatype consistency in ALIGN BY DEVICE.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
new file mode 100644
index 0000000000..18107823e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class SelectIntoUtils {
+
+ public static PartialPath constructTargetPath(
+ PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
+ PartialPath targetDevice = constructTargetDevice(sourcePath.getDevicePath(), deviceTemplate);
+ String targetMeasurement = constructTargetMeasurement(sourcePath, measurementTemplate);
+ return targetDevice.concatNode(targetMeasurement);
+ }
+
+ public static PartialPath constructTargetDevice(
+ PartialPath sourceDevice, PartialPath deviceTemplate) {
+ String[] sourceNodes = sourceDevice.getNodes();
+ String[] templateNodes = deviceTemplate.getNodes();
+
+ List<String> targetNodes = new ArrayList<>();
+ for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
+ String curNode = templateNodes[nodeIndex];
+ if (curNode.equals(DOUBLE_COLONS)) {
+ if (nodeIndex != templateNodes.length - 1) {
+ throw new SemanticException(
+ "select into: placeholder `::` can only be used at the end of the path.");
+ }
+ for (; nodeIndex < sourceNodes.length; nodeIndex++) {
+ targetNodes.add(sourceNodes[nodeIndex]);
+ }
+ break;
+ }
+
+ String resNode = applyLevelPlaceholder(curNode, sourceNodes);
+ targetNodes.add(resNode);
+ }
+ return new PartialPath(targetNodes.toArray(new String[0]));
+ }
+
+ public static String constructTargetMeasurement(
+ PartialPath sourcePath, String measurementTemplate) {
+ if (measurementTemplate.equals(DOUBLE_COLONS)) {
+ return sourcePath.getMeasurement();
+ }
+ return applyLevelPlaceholder(measurementTemplate, sourcePath.getNodes());
+ }
+
+ private static String applyLevelPlaceholder(String templateNode, String[] sourceNodes) {
+ String resNode = templateNode;
+ Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+ while (matcher.find()) {
+ String param = matcher.group();
+ int index;
+ try {
+ index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
+ } catch (NumberFormatException e) {
+ throw new SemanticException("select into: the i of ${i} should be an integer.");
+ }
+ if (index < 1 || index >= sourceNodes.length) {
+ throw new SemanticException(
+ "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+ }
+ resNode = matcher.replaceFirst(sourceNodes[index]);
+ matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+ }
+ return resNode;
+ }
+
+ public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
+ for (Expression expression : expressions) {
+ if (!(expression instanceof TimeSeriesOperand)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 3104265b4f..4267c29f4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -72,6 +72,8 @@ import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoItem;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -810,18 +812,18 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
@Override
public Statement visitSelectStatement(IoTDBSqlParser.SelectStatementContext ctx) {
- if (ctx.intoClause() != null) {
- throw new SemanticException(
- "The SELECT-INTO statement is not supported in the current version.");
- }
-
// initialize query statement
queryStatement = new QueryStatement();
- // parser select, from
+ // parse select, from
parseSelectClause(ctx.selectClause());
parseFromClause(ctx.fromClause());
+ // parse into clause
+ if (ctx.intoClause() != null) {
+ parseIntoClause(ctx.intoClause());
+ }
+
// parse where clause
if (ctx.whereClause() != null) {
WhereCondition whereCondition = parseWhereClause(ctx.whereClause());
@@ -1405,6 +1407,44 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
}
+ // parse INTO clause
+
+ private void parseIntoClause(IoTDBSqlParser.IntoClauseContext ctx) {
+ if (ctx.intoItem().size() > 0) {
+ List<IntoItem> intoItems = new ArrayList<>();
+ for (IoTDBSqlParser.IntoItemContext intoItemContext : ctx.intoItem()) {
+ intoItems.add(parseIntoItem(intoItemContext));
+ }
+ queryStatement.setIntoComponent(new IntoComponent(intoItems));
+ } else {
+ throw new SemanticException("The syntax of SELECT INTO statement has changed from v0.14");
+ }
+ }
+
+ private IntoItem parseIntoItem(IoTDBSqlParser.IntoItemContext intoItemContext) {
+ boolean isAligned = intoItemContext.ALIGNED() != null;
+ PartialPath intoDevice = parseIntoPath(intoItemContext.intoPath());
+ List<String> intoMeasurements =
+ intoItemContext.nodeNameInIntoPath().stream()
+ .map(this::parseNodeNameInIntoPath)
+ .collect(Collectors.toList());
+ return new IntoItem(intoDevice, intoMeasurements, isAligned);
+ }
+
+ private PartialPath parseIntoPath(IoTDBSqlParser.IntoPathContext intoPathContext) {
+ if (intoPathContext instanceof IoTDBSqlParser.FullPathInIntoPathContext) {
+ return parseFullPathInIntoPath((IoTDBSqlParser.FullPathInIntoPathContext) intoPathContext);
+ } else {
+ List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNames =
+ ((IoTDBSqlParser.SuffixPathInIntoPathContext) intoPathContext).nodeNameInIntoPath();
+ String[] path = new String[nodeNames.size()];
+ for (int i = 0; i < nodeNames.size(); i++) {
+ path[i] = parseNodeNameInIntoPath(nodeNames.get(i));
+ }
+ return new PartialPath(path);
+ }
+ }
+
// Insert Statement ========================================================================
@Override
@@ -1568,6 +1608,20 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return new PartialPath(path);
}
+ private PartialPath parseFullPathInIntoPath(IoTDBSqlParser.FullPathInIntoPathContext ctx) {
+ List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNames = ctx.nodeNameInIntoPath();
+ String[] path = new String[nodeNames.size() + 1];
+ int i = 0;
+ if (ctx.ROOT() != null) {
+ path[0] = ctx.ROOT().getText();
+ }
+ for (IoTDBSqlParser.NodeNameInIntoPathContext nodeName : nodeNames) {
+ i++;
+ path[i] = parseNodeNameInIntoPath(nodeName);
+ }
+ return new PartialPath(path);
+ }
+
private PartialPath parsePrefixPath(IoTDBSqlParser.PrefixPathContext ctx) {
List<IoTDBSqlParser.NodeNameContext> nodeNames = ctx.nodeName();
String[] path = new String[nodeNames.size() + 1];
@@ -1599,6 +1653,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return parseNodeString(ctx.getText());
}
+ private String parseNodeNameInIntoPath(IoTDBSqlParser.NodeNameInIntoPathContext ctx) {
+ return parseNodeStringInIntoPath(ctx.getText());
+ }
+
private String parseNodeString(String nodeName) {
if (nodeName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)
|| nodeName.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
@@ -1617,16 +1675,43 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
return nodeName;
}
+ private String parseNodeStringInIntoPath(String nodeName) {
+ if (nodeName.equals(IoTDBConstant.DOUBLE_COLONS)) {
+ return nodeName;
+ }
+ if (nodeName.startsWith(TsFileConstant.BACK_QUOTE_STRING)
+ && nodeName.endsWith(TsFileConstant.BACK_QUOTE_STRING)) {
+ String unWrapped = nodeName.substring(1, nodeName.length() - 1);
+ if (PathUtils.isRealNumber(unWrapped)
+ || !TsFileConstant.IDENTIFIER_PATTERN.matcher(unWrapped).matches()) {
+ return nodeName;
+ }
+ return unWrapped;
+ }
+ checkNodeNameInIntoPath(nodeName);
+ return nodeName;
+ }
+
private void checkNodeName(String src) {
// node name could start with * and end with *
if (!TsFileConstant.NODE_NAME_PATTERN.matcher(src).matches()) {
- throw new SQLParserException(
+ throw new SemanticException(
String.format(
"%s is illegal, unquoted node name can only consist of digits, characters and underscore, or start or end with wildcard",
src));
}
}
+ private void checkNodeNameInIntoPath(String src) {
+ // ${} are allowed
+ if (!TsFileConstant.NODE_NAME_IN_INTO_PATH_PATTERN.matcher(src).matches()) {
+ throw new SQLParserException(
+ String.format(
+ "%s is illegal, unquoted node name in select into clause can only consist of digits, characters, $, { and }",
+ src));
+ }
+ }
+
private void checkIdentifier(String src) {
if (!TsFileConstant.IDENTIFIER_PATTERN.matcher(src).matches()) {
throw new SQLParserException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
new file mode 100644
index 0000000000..080fd88b30
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/DeviceViewIntoPathDescriptor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class DeviceViewIntoPathDescriptor {
+
+ // device -> List<(sourceColumn, targetPath)>
+ private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
+
+ // targetDevice -> isAlignedDevice
+ private final Map<String, Boolean> targetDeviceToAlignedMap;
+
+ public DeviceViewIntoPathDescriptor() {
+ this.deviceToSourceTargetPathPairListMap = new HashMap<>();
+ this.targetDeviceToAlignedMap = new HashMap<>();
+ }
+
+ public DeviceViewIntoPathDescriptor(
+ Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+ Map<String, Boolean> targetDeviceToAlignedMap) {
+ this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
+ this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+ }
+
+ public void specifyTargetDeviceMeasurement(
+ PartialPath sourceDevice,
+ PartialPath targetDevice,
+ String sourceColumn,
+ String targetMeasurement) {
+ deviceToSourceTargetPathPairListMap
+ .computeIfAbsent(sourceDevice.toString(), key -> new ArrayList<>())
+ .add(new Pair<>(sourceColumn, targetDevice.concatNode(targetMeasurement)));
+ }
+
+ public void specifyDeviceAlignment(String targetDevice, boolean isAligned) {
+ if (targetDeviceToAlignedMap.containsKey(targetDevice)
+ && targetDeviceToAlignedMap.get(targetDevice) != isAligned) {
+ throw new SemanticException(
+ "select into: alignment property must be the same for the same device.");
+ }
+ targetDeviceToAlignedMap.put(targetDevice, isAligned);
+ }
+
+ public void validate() {
+ List<PartialPath> targetPaths =
+ deviceToSourceTargetPathPairListMap.values().stream()
+ .flatMap(List::stream)
+ .map(Pair::getRight)
+ .collect(Collectors.toList());
+ if (targetPaths.size() > new HashSet<>(targetPaths).size()) {
+ throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+ }
+ }
+
+ public Map<String, List<Pair<String, PartialPath>>> getDeviceToSourceTargetPathPairListMap() {
+ return deviceToSourceTargetPathPairListMap;
+ }
+
+ public Map<String, Map<PartialPath, Map<String, String>>> getSourceDeviceToTargetPathMap() {
+ // sourceDevice -> targetPathToSourceMap (for each device)
+ // targetPathToSourceMap: targetDevice -> { targetMeasurement -> sourceColumn }
+ Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap =
+ new HashMap<>();
+ for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+ deviceToSourceTargetPathPairListMap.entrySet()) {
+ String sourceDevice = sourceTargetEntry.getKey();
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+
+ Map<PartialPath, Map<String, String>> targetPathToSourceMap = new HashMap<>();
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ String sourceColumn = sourceTargetPathPair.left;
+ PartialPath targetDevice = sourceTargetPathPair.right.getDevicePath();
+ String targetMeasurement = sourceTargetPathPair.right.getMeasurement();
+
+ targetPathToSourceMap
+ .computeIfAbsent(targetDevice, key -> new HashMap<>())
+ .put(targetMeasurement, sourceColumn);
+ }
+ sourceDeviceToTargetPathMap.put(sourceDevice, targetPathToSourceMap);
+ }
+ return sourceDeviceToTargetPathMap;
+ }
+
+ public Map<String, Boolean> getTargetDeviceToAlignedMap() {
+ return targetDeviceToAlignedMap;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(deviceToSourceTargetPathPairListMap.size(), byteBuffer);
+ for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+ deviceToSourceTargetPathPairListMap.entrySet()) {
+ ReadWriteIOUtils.write(sourceTargetEntry.getKey(), byteBuffer);
+
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+ ReadWriteIOUtils.write(sourceTargetPathPairList.size(), byteBuffer);
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ ReadWriteIOUtils.write(sourceTargetPathPair.left, byteBuffer);
+ sourceTargetPathPair.right.serialize(byteBuffer);
+ }
+ }
+
+ ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), byteBuffer);
+ for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(deviceToSourceTargetPathPairListMap.size(), stream);
+ for (Map.Entry<String, List<Pair<String, PartialPath>>> sourceTargetEntry :
+ deviceToSourceTargetPathPairListMap.entrySet()) {
+ ReadWriteIOUtils.write(sourceTargetEntry.getKey(), stream);
+
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = sourceTargetEntry.getValue();
+ ReadWriteIOUtils.write(sourceTargetPathPairList.size(), stream);
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ ReadWriteIOUtils.write(sourceTargetPathPair.left, stream);
+ sourceTargetPathPair.right.serialize(stream);
+ }
+ }
+
+ ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), stream);
+ for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ }
+
+ public static DeviceViewIntoPathDescriptor deserialize(ByteBuffer byteBuffer) {
+ int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap =
+ new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; i++) {
+ String sourceDevice = ReadWriteIOUtils.readString(byteBuffer);
+ int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = new ArrayList<>(listSize);
+ for (int j = 0; j < listSize; j++) {
+ sourceTargetPathPairList.add(
+ new Pair<>(
+ ReadWriteIOUtils.readString(byteBuffer), PartialPath.deserialize(byteBuffer)));
+ }
+ deviceToSourceTargetPathPairListMap.put(sourceDevice, sourceTargetPathPairList);
+ }
+
+ mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<String, Boolean> targetDeviceToAlignedMap = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; i++) {
+ targetDeviceToAlignedMap.put(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readBool(byteBuffer));
+ }
+ return new DeviceViewIntoPathDescriptor(
+ deviceToSourceTargetPathPairListMap, targetDeviceToAlignedMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DeviceViewIntoPathDescriptor that = (DeviceViewIntoPathDescriptor) o;
+ return deviceToSourceTargetPathPairListMap.equals(that.deviceToSourceTargetPathPairListMap)
+ && targetDeviceToAlignedMap.equals(that.targetDeviceToAlignedMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceToSourceTargetPathPairListMap, targetDeviceToAlignedMap);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
new file mode 100644
index 0000000000..6d55cf1e2e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoPathDescriptor {
+
+ // List<(sourceColumn, targetPath)>
+ private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
+
+ // targetDevice -> isAlignedDevice
+ private final Map<String, Boolean> targetDeviceToAlignedMap;
+
+ public IntoPathDescriptor() {
+ this.sourceTargetPathPairList = new ArrayList<>();
+ this.targetDeviceToAlignedMap = new HashMap<>();
+ }
+
+ public IntoPathDescriptor(
+ List<Pair<String, PartialPath>> sourceTargetPathPairList,
+ Map<String, Boolean> targetDeviceToAlignedMap) {
+ this.sourceTargetPathPairList = sourceTargetPathPairList;
+ this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+ }
+
+ public void specifyTargetPath(String sourceColumn, PartialPath targetPath) {
+ sourceTargetPathPairList.add(new Pair<>(sourceColumn, targetPath));
+ }
+
+ public void specifyDeviceAlignment(String targetDevice, boolean isAligned) {
+ if (targetDeviceToAlignedMap.containsKey(targetDevice)
+ && targetDeviceToAlignedMap.get(targetDevice) != isAligned) {
+ throw new SemanticException(
+ "select into: alignment property must be the same for the same device.");
+ }
+ targetDeviceToAlignedMap.put(targetDevice, isAligned);
+ }
+
+ public void validate() {
+ List<PartialPath> targetPaths =
+ sourceTargetPathPairList.stream().map(Pair::getRight).collect(Collectors.toList());
+ if (targetPaths.size() > new HashSet<>(targetPaths).size()) {
+ throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+ }
+ }
+
+ public List<Pair<String, PartialPath>> getSourceTargetPathPairList() {
+ return sourceTargetPathPairList;
+ }
+
+ public Map<String, Boolean> getTargetDeviceToAlignedMap() {
+ return targetDeviceToAlignedMap;
+ }
+
+ public Map<PartialPath, Map<String, String>> getTargetPathToSourceMap() {
+ // targetDevice -> { targetMeasurement -> sourceColumn }
+ Map<PartialPath, Map<String, String>> targetPathToSourceMap = new HashMap<>();
+
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ String sourceColumn = sourceTargetPathPair.left;
+ PartialPath targetDevice = sourceTargetPathPair.right.getDevicePath();
+ String targetMeasurement = sourceTargetPathPair.right.getMeasurement();
+
+ targetPathToSourceMap
+ .computeIfAbsent(targetDevice, key -> new HashMap<>())
+ .put(targetMeasurement, sourceColumn);
+ }
+ return targetPathToSourceMap;
+ }
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(sourceTargetPathPairList.size(), byteBuffer);
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ ReadWriteIOUtils.write(sourceTargetPathPair.left, byteBuffer);
+ sourceTargetPathPair.right.serialize(byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), byteBuffer);
+ for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(sourceTargetPathPairList.size(), stream);
+ for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
+ ReadWriteIOUtils.write(sourceTargetPathPair.left, stream);
+ sourceTargetPathPair.right.serialize(stream);
+ }
+
+ ReadWriteIOUtils.write(targetDeviceToAlignedMap.size(), stream);
+ for (Map.Entry<String, Boolean> entry : targetDeviceToAlignedMap.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ }
+
+ public static IntoPathDescriptor deserialize(ByteBuffer byteBuffer) {
+ int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = new ArrayList<>(listSize);
+ for (int i = 0; i < listSize; i++) {
+ sourceTargetPathPairList.add(
+ new Pair<>(ReadWriteIOUtils.readString(byteBuffer), PartialPath.deserialize(byteBuffer)));
+ }
+
+ int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<String, Boolean> targetDeviceToAlignedMap = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; i++) {
+ targetDeviceToAlignedMap.put(
+ ReadWriteIOUtils.readString(byteBuffer), ReadWriteIOUtils.readBool(byteBuffer));
+ }
+ return new IntoPathDescriptor(sourceTargetPathPairList, targetDeviceToAlignedMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IntoPathDescriptor that = (IntoPathDescriptor) o;
+ return sourceTargetPathPairList.equals(that.sourceTargetPathPairList)
+ && targetDeviceToAlignedMap.equals(that.targetDeviceToAlignedMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sourceTargetPathPairList, targetDeviceToAlignedMap);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
new file mode 100644
index 0000000000..240a5333e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.component;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.checkIsAllRawSeriesQuery;
+
+/** This class maintains information of {@code INTO} clause. */
+public class IntoComponent extends StatementNode {
+
+ public static String PLACEHOLDER_MISMATCH_ERROR_MSG =
+ "select into: the correspondence between the placeholder and the raw time series could not be established.";
+ public static String FORBID_PLACEHOLDER_ERROR_MSG =
+ "select into: placeholders can only be used in raw time series data queries.";
+ public static String DEVICE_NUM_MISMATCH_ERROR_MSG =
+ "select into: the number of source devices and the number of target devices should be the same.";
+ public static String PATH_NUM_MISMATCH_ERROR_MSG =
+ "select into: the number of source columns and the number of target paths should be the same.";
+ public static String DUPLICATE_TARGET_PATH_ERROR_MSG =
+ "select into: target paths in into clause should be different.";
+
+ private final List<IntoItem> intoItems;
+
+ public IntoComponent(List<IntoItem> intoItems) {
+ this.intoItems = intoItems;
+ }
+
+ public boolean isDeviceExistPlaceholder() {
+ for (IntoItem intoItem : intoItems) {
+ if (intoItem.isDeviceExistPlaceholder()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean isMeasurementsExistPlaceholder() {
+ for (IntoItem intoItem : intoItems) {
+ if (intoItem.isMeasurementsExistPlaceholder()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // used in ALIGN BY TIME
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public void validate(List<Expression> sourceColumns) {
+ boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+ if (!isAllRawSeriesQuery) {
+ if (isDeviceExistPlaceholder() || isMeasurementsExistPlaceholder()) {
+ throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+ }
+ }
+
+ if (isMeasurementsExistPlaceholder()) {
+ for (IntoItem intoItem : intoItems) {
+ if (intoItem.getIntoMeasurements().size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ }
+
+ if (isDeviceExistPlaceholder()) {
+ if (intoItems.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoItems.size() != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ } else {
+ int intoPathsNum =
+ intoItems.stream().mapToInt(item -> item.getIntoMeasurements().size()).sum();
+ if (intoPathsNum != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ }
+
+ public IntoPathIterator getIntoPathIterator() {
+ return new IntoPathIterator(
+ intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+ }
+
+ public static class IntoPathIterator extends AbstractIntoIterator {
+
+ public IntoPathIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+ }
+
+ public void next() {
+ if (isMeasurementsExistPlaceholder) {
+ if (!isDeviceExistPlaceholder) {
+ deviceIndex++;
+ }
+ } else {
+ measurementIndex++;
+ if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+ deviceIndex++;
+ measurementIndex = 0;
+ }
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // used in ALIGN BY DEVICE
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public void validate(List<PartialPath> sourceDevices, List<Expression> sourceColumns) {
+ boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+ if (!isAllRawSeriesQuery) {
+ if (isMeasurementsExistPlaceholder()) {
+ throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+ }
+ }
+
+ if (isDeviceExistPlaceholder()) {
+ if (intoItems.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoItems.size() != sourceDevices.size()) {
+ throw new SemanticException(DEVICE_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+
+ for (IntoItem intoItem : intoItems) {
+ List<String> intoMeasurements = intoItem.getIntoMeasurements();
+ if (intoItem.isMeasurementsExistPlaceholder()) {
+ if (intoMeasurements.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoMeasurements.size() != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ }
+ }
+
+ public IntoDeviceMeasurementIterator getIntoDeviceMeasurementIterator() {
+ return new IntoDeviceMeasurementIterator(
+ intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+ }
+
+ public static class IntoDeviceMeasurementIterator extends AbstractIntoIterator {
+
+ public IntoDeviceMeasurementIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+ }
+
+ public void nextDevice() {
+ if (!isDeviceExistPlaceholder) {
+ deviceIndex++;
+ measurementIndex = 0;
+ }
+ }
+
+ public void nextMeasurement() {
+ if (!intoItems.get(deviceIndex).isMeasurementsExistPlaceholder()) {
+ measurementIndex++;
+ if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+ measurementIndex = 0;
+ }
+ }
+ }
+ }
+
+ public abstract static class AbstractIntoIterator {
+
+ protected final List<IntoItem> intoItems;
+
+ protected final boolean isDeviceExistPlaceholder;
+ protected final boolean isMeasurementsExistPlaceholder;
+
+ protected int deviceIndex;
+ protected int measurementIndex;
+
+ public AbstractIntoIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ this.intoItems = intoItems;
+ this.isDeviceExistPlaceholder = isDeviceExistPlaceholder;
+ this.isMeasurementsExistPlaceholder = isMeasurementsExistPlaceholder;
+ this.deviceIndex = 0;
+ this.measurementIndex = 0;
+ }
+
+ public PartialPath getDeviceTemplate() {
+ return intoItems.get(deviceIndex).getIntoDevice();
+ }
+
+ public String getMeasurementTemplate() {
+ return intoItems.get(deviceIndex).getIntoMeasurements().get(measurementIndex);
+ }
+
+ public boolean isAlignedDevice() {
+ return intoItems.get(deviceIndex).isAligned();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java
new file mode 100644
index 0000000000..4efafe3cef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoItem.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.statement.component;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class IntoItem extends StatementNode {
+
+ private final PartialPath intoDevice;
+ private final List<String> intoMeasurements;
+ private final boolean isAligned;
+
+ public IntoItem(PartialPath intoDevice, List<String> intoMeasurements, boolean isAligned) {
+ this.intoDevice = intoDevice;
+ this.intoMeasurements = intoMeasurements;
+ this.isAligned = isAligned;
+ }
+
+ public PartialPath getIntoDevice() {
+ return intoDevice;
+ }
+
+ public List<String> getIntoMeasurements() {
+ return intoMeasurements;
+ }
+
+ public boolean isAligned() {
+ return isAligned;
+ }
+
+ public boolean isDeviceExistPlaceholder() {
+ return intoDevice.containNode(DOUBLE_COLONS)
+ || LEVELED_PATH_TEMPLATE_PATTERN.matcher(intoDevice.getFullPath()).find();
+ }
+
+ public boolean isMeasurementsExistPlaceholder() {
+ for (String measurement : intoMeasurements) {
+ if (measurement.equals(DOUBLE_COLONS)
+ || LEVELED_PATH_TEMPLATE_PATTERN.matcher(measurement).find()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public List<PartialPath> getIntoPaths() {
+ return intoMeasurements.stream().map(intoDevice::concatNode).collect(Collectors.toList());
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index a4011d0ed4..ae6586de5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.HavingCondition;
+import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -98,6 +99,9 @@ public class QueryStatement extends Statement {
// `GROUP BY TAG` clause
protected GroupByTagComponent groupByTagComponent;
+ // `INTO` clause
+ protected IntoComponent intoComponent;
+
public QueryStatement() {
this.statementType = StatementType.QUERY;
}
@@ -254,6 +258,10 @@ public class QueryStatement extends Statement {
return groupByTimeComponent != null;
}
+ public boolean isAlignByTime() {
+ return resultSetFormat == ResultSetFormat.ALIGN_BY_TIME;
+ }
+
public boolean isAlignByDevice() {
return resultSetFormat == ResultSetFormat.ALIGN_BY_DEVICE;
}
@@ -274,6 +282,14 @@ public class QueryStatement extends Statement {
return orderByComponent != null && orderByComponent.isOrderByDevice();
}
+ public IntoComponent getIntoComponent() {
+ return intoComponent;
+ }
+
+ public void setIntoComponent(IntoComponent intoComponent) {
+ this.intoComponent = intoComponent;
+ }
+
public Ordering getResultTimeOrder() {
if (orderByComponent == null || !orderByComponent.isOrderByTime()) {
return Ordering.ASC;
@@ -288,6 +304,10 @@ public class QueryStatement extends Statement {
return orderByComponent.getSortItemList();
}
+ public boolean isSelectInto() {
+ return intoComponent != null;
+ }
+
public void semanticCheck() {
if (isAggregationQuery()) {
if (disableAlign()) {
@@ -385,6 +405,24 @@ public class QueryStatement extends Statement {
"Sorting by device is only supported in ALIGN BY DEVICE queries.");
}
}
+
+ if (isSelectInto()) {
+ if (getSeriesLimit() > 0) {
+ throw new SemanticException("select into: slimit clauses are not supported.");
+ }
+ if (getSeriesOffset() > 0) {
+ throw new SemanticException("select into: soffset clauses are not supported.");
+ }
+ if (disableAlign()) {
+ throw new SemanticException("select into: disable align clauses are not supported.");
+ }
+ if (isLastQuery()) {
+ throw new SemanticException("select into: last clauses are not supported.");
+ }
+ if (isGroupByTag()) {
+ throw new SemanticException("select into: GROUP BY TAGS clause are not supported.");
+ }
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index f8dfc07c97..d6fa5100e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1271,25 +1271,31 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
private SelectIntoOperator parseAndConstructSelectIntoOperator(
IoTDBSqlParser.SelectStatementContext ctx) {
+ IoTDBSqlParser.IntoClauseContext intoClauseContext = ctx.intoClause();
+ if (intoClauseContext.intoItem().size() > 0) {
+ throw new SQLParserException(
+ "The new syntax of SELECT INTO statement will be supported starting from v0.14");
+ }
+
if (queryOp.getFromComponent().getPrefixPaths().size() != 1) {
throw new SQLParserException(
"select into: the number of prefix paths in the from clause should be 1.");
}
-
int sourcePathsCount = queryOp.getSelectComponent().getResultColumns().size();
- if (sourcePathsCount != ctx.intoClause().intoPath().size()) {
+ if (sourcePathsCount != intoClauseContext.intoPath().size()) {
throw new SQLParserException(
"select into: the number of source paths and the number of target paths should be the same.");
}
SelectIntoOperator selectIntoOperator = new SelectIntoOperator();
selectIntoOperator.setQueryOperator(queryOp);
+
List<PartialPath> intoPaths = new ArrayList<>();
for (int i = 0; i < sourcePathsCount; ++i) {
- intoPaths.add(parseIntoPath(ctx.intoClause().intoPath(i)));
+ intoPaths.add(parseIntoPath(intoClauseContext.intoPath(i)));
}
selectIntoOperator.setIntoPaths(intoPaths);
- selectIntoOperator.setIntoPathsAligned(ctx.intoClause().ALIGNED() != null);
+ selectIntoOperator.setIntoPathsAligned(intoClauseContext.ALIGNED() != null);
return selectIntoOperator;
}
@@ -1304,8 +1310,9 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
}
PartialPath intoPath = null;
- if (intoPathContext.fullPath() != null) {
- intoPath = parseFullPathInSelectInto(intoPathContext.fullPath());
+ if (intoPathContext instanceof IoTDBSqlParser.FullPathInIntoPathContext) {
+ intoPath =
+ parseFullPathInSelectInto((IoTDBSqlParser.FullPathInIntoPathContext) intoPathContext);
Matcher m = leveledPathNodePattern.matcher(intoPath.getFullPath());
while (m.find()) {
@@ -1321,9 +1328,9 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
"the x of ${x} should be greater than 0 and equal to or less than <level> or the length of queried path prefix.");
}
}
- } else if (intoPathContext.nodeNameWithoutWildcard() != null) {
- List<IoTDBSqlParser.NodeNameWithoutWildcardContext> nodeNameWithoutStars =
- intoPathContext.nodeNameWithoutWildcard();
+ } else if (intoPathContext instanceof IoTDBSqlParser.SuffixPathInIntoPathContext) {
+ List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNameWithoutStars =
+ ((IoTDBSqlParser.SuffixPathInIntoPathContext) intoPathContext).nodeNameInIntoPath();
String[] intoPathNodes =
new String[1 + levelLimitOfSourcePrefixPath + nodeNameWithoutStars.size()];
@@ -2565,15 +2572,14 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
return new PartialPath(path);
}
- private PartialPath parseFullPathInSelectInto(IoTDBSqlParser.FullPathContext ctx) {
- List<IoTDBSqlParser.NodeNameWithoutWildcardContext> nodeNamesWithoutStar =
- ctx.nodeNameWithoutWildcard();
+ private PartialPath parseFullPathInSelectInto(IoTDBSqlParser.FullPathInIntoPathContext ctx) {
+ List<IoTDBSqlParser.NodeNameInIntoPathContext> nodeNamesWithoutStar = ctx.nodeNameInIntoPath();
String[] path = new String[nodeNamesWithoutStar.size() + 1];
int i = 0;
if (ctx.ROOT() != null) {
path[0] = ctx.ROOT().getText();
}
- for (IoTDBSqlParser.NodeNameWithoutWildcardContext nodeNameWithoutStar : nodeNamesWithoutStar) {
+ for (IoTDBSqlParser.NodeNameInIntoPathContext nodeNameWithoutStar : nodeNamesWithoutStar) {
i++;
path[i] = parseNodeNameWithoutWildCardInSelectInto(nodeNameWithoutStar);
}
@@ -2642,7 +2648,7 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
/** in select into, $ and {} are allowed */
private String parseNodeNameWithoutWildCardInSelectInto(
- IoTDBSqlParser.NodeNameWithoutWildcardContext ctx) {
+ IoTDBSqlParser.NodeNameInIntoPathContext ctx) {
String nodeName = ctx.getText();
if (nodeName.equals(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)
|| nodeName.equals(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index 02d4643fe5..1f51560c06 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.ratis.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.ratis.thirdparty.com.google.common.collect.Sets;
@@ -51,9 +52,13 @@ import org.junit.Assert;
import org.junit.Test;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
import static org.junit.Assert.assertEquals;
@@ -585,6 +590,170 @@ public class AnalyzeTest {
1);
}
+ @Test
+ public void testSelectIntoPath() throws IllegalPathException {
+ List<String> sqls =
+ Arrays.asList(
+ "SELECT s1, s2 INTO root.sg_copy.d1(t1, t2), root.sg_copy.d2(t1, t2) FROM root.sg.d1, root.sg.d2;",
+ "SELECT s1, s2 INTO root.sg_copy.d1(t1, t2, t3, t4) FROM root.sg.d1, root.sg.d2;",
+ "SELECT s1, s2 INTO root.sg_copy.d1(t1), root.sg_copy.d2(t1, t2), root.sg_copy.d3(t1) FROM root.sg.d1, root.sg.d2;",
+ "select count(s1 + s2), last_value(s2) into root.agg.count(s1_add_s2), root.agg.last_value(s2) from root.sg.d1 group by ([0, 100), 10ms);",
+ "select s1 + s2 into root.expr.add(d1s1_d1s2, d1s1_d2s2, d2s1_d1s2, d2s1_d2s2) from root.sg.d1, root.sg.d2;",
+ "select s1, s1 into root.sg_copy.d1(s1, s2) from root.sg.d1",
+ "select s1, s1 into root.sg_copy.d1(s1), root.sg_copy.d2(s1) from root.sg.d1",
+ "select s1, s2 into ::(t1, t1, t2, t2) from root.sg.*;",
+ "select s1, s2 into root.sg_copy.::(::) from root.sg.*;",
+ "select s1, s2 into root.sg_copy.d1_copy(${2}_${3}), root.sg_copy.d1_copy(${2}_${3}), root.sg_copy.d2_copy(${2}_${3}), root.sg_copy.d2_copy(${2}_${3}) from root.sg.d1, root.sg.d2;",
+ "select d1.s1, d1.s2, d2.s1, d2.s2 into ::(s1_1, s2_2), root.sg.d2_2(s3_3), root.backup_${1}.::(s4) from root.sg");
+ List<List<Pair<String, PartialPath>>> results =
+ Arrays.asList(
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1.t2")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2.t1")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2.t2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1.t2")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d1.t3")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d1.t4"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.t1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d2.t1")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2.t2")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d3.t1"))),
+ Arrays.asList(
+ new Pair<>(
+ "count(root.sg.d1.s1 + root.sg.d1.s2)",
+ new PartialPath("root.agg.count.s1_add_s2")),
+ new Pair<>("last_value(root.sg.d1.s2)", new PartialPath("root.agg.last_value.s2"))),
+ Arrays.asList(
+ new Pair(
+ "root.sg.d1.s1 + root.sg.d1.s2", new PartialPath("root.expr.add.d1s1_d1s2")),
+ new Pair(
+ "root.sg.d1.s1 + root.sg.d2.s2", new PartialPath("root.expr.add.d1s1_d2s2")),
+ new Pair(
+ "root.sg.d2.s1 + root.sg.d1.s2", new PartialPath("root.expr.add.d2s1_d1s2")),
+ new Pair(
+ "root.sg.d2.s1 + root.sg.d2.s2", new PartialPath("root.expr.add.d2s1_d2s2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d2.s1"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg.d1.t1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg.d2.t1")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg.d1.t2")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg.d2.t2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1.s1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d2.s1")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d1.s2")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2.s2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg_copy.d1_copy.d1_s1")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg_copy.d1_copy.d2_s1")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg_copy.d2_copy.d1_s2")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.sg_copy.d2_copy.d2_s2"))),
+ Arrays.asList(
+ new Pair("root.sg.d1.s1", new PartialPath("root.sg.d1.s1_1")),
+ new Pair("root.sg.d1.s2", new PartialPath("root.sg.d1.s2_2")),
+ new Pair("root.sg.d2.s1", new PartialPath("root.sg.d2_2.s3_3")),
+ new Pair("root.sg.d2.s2", new PartialPath("root.backup_sg.d2.s4"))));
+
+ for (int i = 0; i < sqls.size(); i++) {
+ Analysis analysis = analyzeSQL(sqls.get(i));
+ assert analysis != null;
+ Assert.assertEquals(
+ results.get(i), analysis.getIntoPathDescriptor().getSourceTargetPathPairList());
+ }
+ }
+
+ @Test
+ public void testSelectIntoPathAlignByDevice() throws IllegalPathException {
+ List<String> sqls =
+ Arrays.asList(
+ "select s1, s2 into root.sg_copy.::(t1, t2) from root.sg.d1, root.sg.d2 align by device;",
+ "select s1 + s2 into root.expr.add(d1s1_d1s2), root.expr.add(d2s1_d2s2) from root.sg.d1, root.sg.d2 align by device;",
+ "select count(s1), last_value(s2) into root.agg.::(count_s1, last_value_s2) from root.sg.d1, root.sg.d2 group by ([0, 100), 10ms) align by device;",
+ "select s1, s2 into root.sg1.new_d1(::), root.sg2.new_d2(::) from root.sg.d1, root.sg.d2 align by device;",
+ "select s1, s2 into root.sg1.new_${2}(::) from root.sg.d1, root.sg.d2 align by device;");
+
+ List<Map<String, List<Pair<String, PartialPath>>>> results = new ArrayList<>();
+ Map<String, List<Pair<String, PartialPath>>> resultMap1 = new HashMap<>();
+ resultMap1.put(
+ "root.sg.d1",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg_copy.d1.t1")),
+ new Pair<>("s2", new PartialPath("root.sg_copy.d1.t2"))));
+ resultMap1.put(
+ "root.sg.d2",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg_copy.d2.t1")),
+ new Pair<>("s2", new PartialPath("root.sg_copy.d2.t2"))));
+ results.add(resultMap1);
+
+ Map<String, List<Pair<String, PartialPath>>> resultMap2 = new HashMap<>();
+ resultMap2.put(
+ "root.sg.d1",
+ Collections.singletonList(
+ new Pair<>("s1 + s2", new PartialPath("root.expr.add.d1s1_d1s2"))));
+ resultMap2.put(
+ "root.sg.d2",
+ Collections.singletonList(
+ new Pair<>("s1 + s2", new PartialPath("root.expr.add.d2s1_d2s2"))));
+ results.add(resultMap2);
+
+ Map<String, List<Pair<String, PartialPath>>> resultMap3 = new HashMap<>();
+ resultMap3.put(
+ "root.sg.d1",
+ Arrays.asList(
+ new Pair<>("count(s1)", new PartialPath("root.agg.d1.count_s1")),
+ new Pair<>("last_value(s2)", new PartialPath("root.agg.d1.last_value_s2"))));
+ resultMap3.put(
+ "root.sg.d2",
+ Arrays.asList(
+ new Pair<>("count(s1)", new PartialPath("root.agg.d2.count_s1")),
+ new Pair<>("last_value(s2)", new PartialPath("root.agg.d2.last_value_s2"))));
+ results.add(resultMap3);
+
+ Map<String, List<Pair<String, PartialPath>>> resultMap4 = new HashMap<>();
+ resultMap4.put(
+ "root.sg.d1",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg1.new_d1.s1")),
+ new Pair<>("s2", new PartialPath("root.sg1.new_d1.s2"))));
+ resultMap4.put(
+ "root.sg.d2",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg2.new_d2.s1")),
+ new Pair<>("s2", new PartialPath("root.sg2.new_d2.s2"))));
+ results.add(resultMap4);
+
+ Map<String, List<Pair<String, PartialPath>>> resultMap5 = new HashMap<>();
+ resultMap5.put(
+ "root.sg.d1",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg1.new_d1.s1")),
+ new Pair<>("s2", new PartialPath("root.sg1.new_d1.s2"))));
+ resultMap5.put(
+ "root.sg.d2",
+ Arrays.asList(
+ new Pair<>("s1", new PartialPath("root.sg1.new_d2.s1")),
+ new Pair<>("s2", new PartialPath("root.sg1.new_d2.s2"))));
+ results.add(resultMap5);
+
+ for (int i = 0; i < sqls.size(); i++) {
+ Analysis analysis = analyzeSQL(sqls.get(i));
+ assert analysis != null;
+ Assert.assertEquals(
+ results.get(i),
+ analysis.getDeviceViewIntoPathDescriptor().getDeviceToSourceTargetPathPairListMap());
+ }
+ }
+
private Analysis analyzeSQL(String sql) {
try {
Statement statement =
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
index 86ea19abcd..956a34a953 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/constant/TsFileConstant.java
@@ -44,5 +44,9 @@ public class TsFileConstant {
private static final String NODE_NAME_MATCHER = "(\\*{0,2}[a-zA-Z0-9_\\u2E80-\\u9FFF]+\\*{0,2})";
public static final Pattern NODE_NAME_PATTERN = Pattern.compile(NODE_NAME_MATCHER);
+ private static final String NODE_NAME_IN_INTO_PATH_MATCHER = "([a-zA-Z0-9_${}\\u2E80-\\u9FFF]+)";
+ public static final Pattern NODE_NAME_IN_INTO_PATH_PATTERN =
+ Pattern.compile(NODE_NAME_IN_INTO_PATH_MATCHER);
+
private TsFileConstant() {}
}