You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/13 10:57:34 UTC
[iotdb] 02/04: refactor analyzer
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/mppSelectInto
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4904bea0ad14151e73ac4a3ebe10d1432b73728b
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Thu Oct 13 16:03:29 2022 +0800
refactor analyzer
---
.../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 31 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 398 +++------------------
.../iotdb/db/mpp/plan/analyze/SelectIntoUtils.java | 105 ++++++
.../parameter/IntoDeviceMeasurementDescriptor.java | 57 +++
.../planner/plan/parameter/IntoPathDescriptor.java | 60 ++++
.../plan/statement/component/IntoComponent.java | 188 +++++++++-
6 files changed, 479 insertions(+), 360 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 119f67654a..ea344ca9fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.path.PatternTreeMap;
-import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.NodeRef;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
@@ -33,6 +31,8 @@ import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoDeviceMeasurementDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -140,12 +140,14 @@ public class Analysis {
private DatasetHeader respDatasetHeader;
/////////////////////////////////////////////////////////////////////////////////////////////////
- // SELECT INTO Analysis (used in ALIGN BY DEVICE)
+ // SELECT INTO Analysis
/////////////////////////////////////////////////////////////////////////////////////////////////
- private PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap;
+ // used in ALIGN BY DEVICE
+ private IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor;
- private Map<PartialPath, Boolean> intoDeviceToAlignedMap;
+ // used in ALIGN BY TIME
+ private IntoPathDescriptor intoPathDescriptor;
/////////////////////////////////////////////////////////////////////////////////////////////////
// Schema Query Analysis
@@ -414,21 +416,20 @@ public class Analysis {
this.deviceViewOutputExpressions = deviceViewOutputExpressions;
}
- public PatternTreeMap<String, PatternTreeMapFactory.StringSerializer>
- getIntoPathPatternTreeMap() {
- return intoPathPatternTreeMap;
+ public IntoDeviceMeasurementDescriptor getIntoDeviceMeasurementDescriptor() {
+ return intoDeviceMeasurementDescriptor;
}
- public void setIntoPathPatternTreeMap(
- PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap) {
- this.intoPathPatternTreeMap = intoPathPatternTreeMap;
+ public void setIntoDeviceMeasurementDescriptor(
+ IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor) {
+ this.intoDeviceMeasurementDescriptor = intoDeviceMeasurementDescriptor;
}
- public Map<PartialPath, Boolean> getIntoDeviceToAlignedMap() {
- return intoDeviceToAlignedMap;
+ public IntoPathDescriptor getIntoPathDescriptor() {
+ return intoPathDescriptor;
}
- public void setIntoDeviceToAlignedMap(Map<PartialPath, Boolean> intoDeviceToAlignedMap) {
- this.intoDeviceToAlignedMap = intoDeviceToAlignedMap;
+ public void setIntoPathDescriptor(IntoPathDescriptor intoPathDescriptor) {
+ this.intoPathDescriptor = intoPathDescriptor;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index f28f1064fb..dcd90bf3fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
-import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
@@ -39,7 +38,6 @@ import org.apache.iotdb.db.exception.metadata.template.TemplateImcompatibeExcept
import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
-import org.apache.iotdb.db.metadata.path.PatternTreeMapFactory;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
@@ -54,6 +52,8 @@ import org.apache.iotdb.db.mpp.plan.expression.ExpressionType;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoDeviceMeasurementDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
@@ -61,7 +61,6 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.IntoItem;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
@@ -142,16 +141,16 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
-import java.util.regex.Matcher;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
-import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.metadata.MetadataConstant.ALL_RESULT_NODES;
import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetDevice;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetMeasurement;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.constructTargetPath;
/** This visitor is used to analyze each type of Statement and returns the {@link Analysis}. */
public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> {
@@ -1000,368 +999,85 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
QueryStatement queryStatement,
Set<PartialPath> deviceSet,
List<Pair<Expression, String>> outputExpressions) {
- List<Expression> outputColumns =
+ if (!queryStatement.isSelectInto()) {
+ return;
+ }
+
+ List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
+ List<Expression> sourceColumns =
outputExpressions.stream()
.map(Pair::getLeft)
.collect(Collectors.toCollection(ArrayList::new));
- boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(outputColumns);
IntoComponent intoComponent = queryStatement.getIntoComponent();
- List<IntoItem> intoItems = intoComponent.getIntoItems();
-
- List<PartialPath> sourceDevices = new ArrayList<>(deviceSet);
- List<PartialPath> targetDevices = new ArrayList<>(sourceDevices.size());
- if (intoComponent.isDeviceExistPlaceholder()) {
- if (intoItems.size() > 1) {
- throw new SemanticException("");
- }
-
- PartialPath deviceTemplate = intoItems.get(0).getIntoDevice();
- for (PartialPath sourceDevice : sourceDevices) {
- targetDevices.add(constructIntoDevice(sourceDevice, deviceTemplate));
- }
- } else {
- if (intoItems.size() != sourceDevices.size()) {
- throw new SemanticException("");
- }
-
- for (IntoItem intoItem : intoItems) {
- targetDevices.add(intoItem.getIntoDevice());
- }
- }
-
- PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap =
- PatternTreeMapFactory.getIntoPathPatternTreeMap();
-
- if (intoComponent.isMeasurementsExistPlaceholder()) {
- if (!isAllRawSeriesQuery) {
- throw new SemanticException("");
- }
-
- if (intoComponent.isDeviceExistPlaceholder()) {
- String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
- for (int i = 0; i < targetDevices.size(); i++) {
- PartialPath targetDevice = targetDevices.get(i);
- for (Expression outputColumn : outputColumns) {
- PartialPath intoPath =
- targetDevice.concatNode(
- constructIntoMeasurement(
- sourceDevices.get(i).concatNode(outputColumn.toString()),
- measurementTemplate));
- intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
- }
- } else {
- for (int deviceIndex = 0; deviceIndex < sourceDevices.size(); deviceIndex++) {
- IntoItem intoItem = intoItems.get(deviceIndex);
- List<String> intoMeasurements = intoItem.getIntoMeasurements();
- PartialPath targetDevice = targetDevices.get(deviceIndex);
-
- if (intoItem.isMeasurementsExistPlaceholder()) {
- if (intoMeasurements.size() > 1) {
- throw new SemanticException("");
- }
-
- String measurementTemplate = intoMeasurements.get(0);
- for (Expression outputColumn : outputColumns) {
- PartialPath intoPath =
- targetDevice.concatNode(
- constructIntoMeasurement(
- sourceDevices.get(deviceIndex).concatNode(outputColumn.toString()),
- measurementTemplate));
- intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
- } else {
- if (intoMeasurements.size() != outputColumns.size()) {
- throw new SemanticException(
- "select into: the number of source columns and the number of target paths should be the same.");
- }
-
- for (int measurementIndex = 0;
- measurementIndex < intoMeasurements.size();
- measurementIndex++) {
- PartialPath intoPath =
- targetDevice.concatNode(intoMeasurements.get(measurementIndex));
- intoPathPatternTreeMap.append(
- intoPath, outputColumns.get(measurementIndex).toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
- }
- }
- }
- } else {
- for (int deviceIndex = 0; deviceIndex < sourceDevices.size(); deviceIndex++) {
- IntoItem intoItem = intoItems.get(deviceIndex);
- List<String> intoMeasurements = intoItem.getIntoMeasurements();
- if (intoMeasurements.size() != outputColumns.size()) {
- throw new SemanticException(
- "select into: the number of source columns and the number of target paths should be the same.");
- }
-
- PartialPath targetDevice = targetDevices.get(deviceIndex);
- for (int measurementIndex = 0;
- measurementIndex < intoMeasurements.size();
- measurementIndex++) {
- PartialPath intoPath = targetDevice.concatNode(intoMeasurements.get(measurementIndex));
- intoPathPatternTreeMap.append(intoPath, outputColumns.get(measurementIndex).toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
- }
- }
-
- if (isAllRawSeriesQuery) {
- if (intoComponent.isDeviceExistPlaceholder()) {
- if (intoComponent.isMeasurementsExistPlaceholder()) {
- String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
- for (int i = 0; i < targetDevices.size(); i++) {
- PartialPath targetDevice = targetDevices.get(i);
- for (Expression outputColumn : outputColumns) {
- PartialPath intoPath =
- targetDevice.concatNode(
- constructIntoMeasurement(
- sourceDevices.get(i).concatNode(outputColumn.toString()),
- measurementTemplate));
- intoPathPatternTreeMap.append(intoPath, outputColumn.toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPath).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
- }
+ intoComponent.validate(sourceDevices, sourceColumns);
+
+ IntoDeviceMeasurementDescriptor intoDeviceMeasurementDescriptor =
+ new IntoDeviceMeasurementDescriptor();
+ IntoComponent.IntoDeviceMeasurementIterator intoDeviceMeasurementIterator =
+ intoComponent.getIntoDeviceMeasurementIterator();
+ for (PartialPath sourceDevice : sourceDevices) {
+ PartialPath deviceTemplate = intoDeviceMeasurementIterator.getDeviceTemplate();
+ boolean isAlignedDevice = intoDeviceMeasurementIterator.isAlignedDevice();
+ PartialPath targetDevice = constructTargetDevice(sourceDevice, deviceTemplate);
+ intoDeviceMeasurementDescriptor.specifyTargetDevice(sourceDevice, targetDevice);
+ intoDeviceMeasurementDescriptor.specifyDeviceAlignment(targetDevice, isAlignedDevice);
+
+ for (Expression sourceColumn : sourceColumns) {
+ String measurementTemplate = intoDeviceMeasurementIterator.getMeasurementTemplate();
+ String targetMeasurement;
+ if (sourceColumn instanceof TimeSeriesOperand) {
+ targetMeasurement =
+ constructTargetMeasurement(
+ sourceDevice.concatNode(sourceColumn.toString()), measurementTemplate);
} else {
-
+ targetMeasurement = sourceColumn.toString();
}
- }
- } else {
-
- }
- analyzeIntoDevices(analysis, intoItems);
- analysis.setIntoPathPatternTreeMap(intoPathPatternTreeMap);
- }
-
- private PartialPath constructIntoDevice(PartialPath sourceDevice, PartialPath deviceTemplate) {
- String[] sourceNodes = sourceDevice.getNodes();
- String[] templateNodes = deviceTemplate.getNodes();
-
- List<String> targetNodes = new ArrayList<>();
- for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
- String curNode = templateNodes[nodeIndex];
- if (curNode.equals(DOUBLE_COLONS)) {
- if (nodeIndex != templateNodes.length - 1) {
- throw new SemanticException("");
- }
- for (; nodeIndex < sourceNodes.length; nodeIndex++) {
- targetNodes.add(sourceNodes[nodeIndex]);
- }
- break;
+ intoDeviceMeasurementDescriptor.specifyTargetMeasurement(
+ targetDevice, sourceColumn.toString(), targetMeasurement);
+ intoDeviceMeasurementIterator.nextMeasurement();
}
- String resNode = curNode;
- Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
- while (matcher.find()) {
- String param = matcher.group();
- int index;
- try {
- index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
- } catch (NumberFormatException e) {
- throw new SemanticException("select into: the i of ${i} should be an integer.");
- }
- if (index < 1 || index >= sourceNodes.length) {
- throw new SemanticException(
- "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
- }
- resNode = matcher.replaceFirst(sourceNodes[index]);
- matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
- }
- targetNodes.add(resNode);
+ intoDeviceMeasurementIterator.nextDevice();
}
- return new PartialPath(targetNodes.toArray(new String[0]));
+ analysis.setIntoDeviceMeasurementDescriptor(intoDeviceMeasurementDescriptor);
}
private void analyzeInto(
Analysis analysis,
QueryStatement queryStatement,
List<Pair<Expression, String>> outputExpressions) {
- if (queryStatement.getIntoComponent() == null) {
+ if (!queryStatement.isSelectInto()) {
return;
}
- List<Expression> outputColumns =
+ List<Expression> sourceColumns =
outputExpressions.stream()
.map(Pair::getLeft)
.collect(Collectors.toCollection(ArrayList::new));
- boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(outputColumns);
IntoComponent intoComponent = queryStatement.getIntoComponent();
- List<IntoItem> intoItems = intoComponent.getIntoItems();
-
- List<PartialPath> intoPaths = new ArrayList<>();
- PatternTreeMap<String, PatternTreeMapFactory.StringSerializer> intoPathPatternTreeMap =
- PatternTreeMapFactory.getIntoPathPatternTreeMap();
-
- if (isAllRawSeriesQuery) {
- List<PartialPath> sourcePaths =
- outputColumns.stream()
- .map(expression -> ((TimeSeriesOperand) expression).getPath())
- .collect(Collectors.toList());
-
- if (intoComponent.isDeviceExistPlaceholder()) {
- if (intoComponent.isMeasurementsExistPlaceholder()) {
- if (intoItems.size() > 1) {
- throw new SemanticException("");
- }
- if (intoItems.get(0).getIntoMeasurements().size() > 1) {
- throw new SemanticException("");
- }
-
- PartialPath deviceTemplate = intoItems.get(0).getIntoDevice();
- String measurementTemplate = intoItems.get(0).getIntoMeasurements().get(0);
- for (PartialPath sourcePath : sourcePaths) {
- intoPaths.add(constructIntoPath(sourcePath, deviceTemplate, measurementTemplate));
- }
- } else {
- int sourcePathIndex = 0;
- for (IntoItem intoItem : intoItems) {
- PartialPath deviceTemplate = intoItem.getIntoDevice();
- List<String> measurementList = intoItem.getIntoMeasurements();
- for (String measurement : measurementList) {
- intoPaths.add(
- constructIntoPath(sourcePaths.get(sourcePathIndex), deviceTemplate, measurement));
- sourcePathIndex++;
- }
- }
- }
+ intoComponent.validate(sourceColumns);
+
+ IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+ IntoComponent.IntoPathIterator intoPathIterator = intoComponent.getIntoPathIterator();
+ for (Expression sourceColumn : sourceColumns) {
+ PartialPath deviceTemplate = intoPathIterator.getDeviceTemplate();
+ String measurementTemplate = intoPathIterator.getMeasurementTemplate();
+ boolean isAlignedDevice = intoPathIterator.isAlignedDevice();
+
+ PartialPath targetPath;
+ if (sourceColumn instanceof TimeSeriesOperand) {
+ PartialPath sourcePath = ((TimeSeriesOperand) sourceColumn).getPath();
+ targetPath = constructTargetPath(sourcePath, deviceTemplate, measurementTemplate);
} else {
- if (intoComponent.isMeasurementsExistPlaceholder()) {
- if (intoItems.size() != outputColumns.size()) {
- throw new SemanticException(
- "select into: the number of source columns and the number of target paths should be the same.");
- }
-
- for (int i = 0; i < intoItems.size(); i++) {
- if (intoItems.get(i).getIntoMeasurements().size() != 1) {
- throw new SemanticException("");
- }
-
- PartialPath targetDevice = intoItems.get(i).getIntoDevice();
- intoPaths.add(
- targetDevice.concatNode(
- constructIntoMeasurement(
- sourcePaths.get(i), intoItems.get(0).getIntoMeasurements().get(0))));
- }
- } else {
- intoPaths =
- intoItems.stream()
- .map(IntoItem::getIntoPaths)
- .flatMap(List::stream)
- .collect(Collectors.toList());
- }
- }
- } else {
- // disable placeholder
- for (IntoItem intoItem : intoItems) {
- if (intoItem.isDeviceExistPlaceholder() || intoItem.isMeasurementsExistPlaceholder()) {
- throw new SemanticException(
- "select into: placeholders can only be used in raw timeseries data queries.");
- }
- }
-
- intoPaths =
- intoItems.stream()
- .map(IntoItem::getIntoPaths)
- .flatMap(List::stream)
- .collect(Collectors.toList());
- }
-
- // check quantity consistency
- if (intoPaths.size() != outputColumns.size()) {
- throw new SemanticException(
- "select into: the number of source columns and the number of target paths should be the same.");
- }
-
- for (int i = 0; i < intoPaths.size(); i++) {
- intoPathPatternTreeMap.append(intoPaths.get(i), outputColumns.get(i).toString());
- if (intoPathPatternTreeMap.getOverlapped(intoPaths.get(i)).size() > 1) {
- throw new SemanticException(
- "select into: target paths in into clause should be different.");
- }
- }
-
- analyzeIntoDevices(analysis, intoItems);
- analysis.setIntoPathPatternTreeMap(intoPathPatternTreeMap);
- }
-
- private void analyzeIntoDevices(Analysis analysis, List<IntoItem> intoItems) {
- Map<PartialPath, Boolean> intoDeviceToAlignedMap = new HashMap<>();
- for (IntoItem intoItem : intoItems) {
- PartialPath devicePath = intoItem.getIntoDevice();
- boolean isAligned = intoItem.isAligned();
- if (intoDeviceToAlignedMap.containsKey(devicePath)
- && intoDeviceToAlignedMap.get(devicePath) != isAligned) {
- throw new SemanticException(
- String.format(
- "select into: inconsistent alignment property specified for device '%s'.",
- devicePath));
- }
- intoDeviceToAlignedMap.put(devicePath, isAligned);
- }
- analysis.setIntoDeviceToAlignedMap(intoDeviceToAlignedMap);
- }
-
- private boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
- for (Expression expression : expressions) {
- if (!(expression instanceof TimeSeriesOperand)) {
- return false;
- }
- }
- return true;
- }
-
- private PartialPath constructIntoPath(
- PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
- PartialPath targetDevice = constructIntoDevice(sourcePath.getDevicePath(), deviceTemplate);
- String targetMeasurement = constructIntoMeasurement(sourcePath, measurementTemplate);
- return targetDevice.concatNode(targetMeasurement);
- }
-
- private String constructIntoMeasurement(PartialPath sourcePath, String measurementTemplate) {
- if (measurementTemplate.equals(DOUBLE_COLONS)) {
- return sourcePath.getMeasurement();
- }
-
- String[] sourceNodes = sourcePath.getNodes();
- String resNode = measurementTemplate;
- Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
- while (matcher.find()) {
- String param = matcher.group();
- int index;
- try {
- index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
- } catch (NumberFormatException e) {
- throw new SemanticException("select into: the i of ${i} should be an integer.");
- }
- if (index < 1 || index >= sourceNodes.length) {
- throw new SemanticException(
- "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+ targetPath = deviceTemplate.concatNode(measurementTemplate);
}
- resNode = matcher.replaceFirst(sourceNodes[index]);
- matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+ intoPathDescriptor.specifyTargetPath(sourceColumn.toString(), targetPath);
+ intoPathDescriptor.specifyDeviceAlignment(targetPath.getDevicePath(), isAlignedDevice);
+ intoPathIterator.next();
}
- return resNode;
+ analysis.setIntoPathDescriptor(intoPathDescriptor);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
new file mode 100644
index 0000000000..18107823e5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/SelectIntoUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+
+public class SelectIntoUtils {
+
+ public static PartialPath constructTargetPath(
+ PartialPath sourcePath, PartialPath deviceTemplate, String measurementTemplate) {
+ PartialPath targetDevice = constructTargetDevice(sourcePath.getDevicePath(), deviceTemplate);
+ String targetMeasurement = constructTargetMeasurement(sourcePath, measurementTemplate);
+ return targetDevice.concatNode(targetMeasurement);
+ }
+
+ public static PartialPath constructTargetDevice(
+ PartialPath sourceDevice, PartialPath deviceTemplate) {
+ String[] sourceNodes = sourceDevice.getNodes();
+ String[] templateNodes = deviceTemplate.getNodes();
+
+ List<String> targetNodes = new ArrayList<>();
+ for (int nodeIndex = 0; nodeIndex < templateNodes.length; nodeIndex++) {
+ String curNode = templateNodes[nodeIndex];
+ if (curNode.equals(DOUBLE_COLONS)) {
+ if (nodeIndex != templateNodes.length - 1) {
+ throw new SemanticException(
+ "select into: placeholder `::` can only be used at the end of the path.");
+ }
+ for (; nodeIndex < sourceNodes.length; nodeIndex++) {
+ targetNodes.add(sourceNodes[nodeIndex]);
+ }
+ break;
+ }
+
+ String resNode = applyLevelPlaceholder(curNode, sourceNodes);
+ targetNodes.add(resNode);
+ }
+ return new PartialPath(targetNodes.toArray(new String[0]));
+ }
+
+ public static String constructTargetMeasurement(
+ PartialPath sourcePath, String measurementTemplate) {
+ if (measurementTemplate.equals(DOUBLE_COLONS)) {
+ return sourcePath.getMeasurement();
+ }
+ return applyLevelPlaceholder(measurementTemplate, sourcePath.getNodes());
+ }
+
+ private static String applyLevelPlaceholder(String templateNode, String[] sourceNodes) {
+ String resNode = templateNode;
+ Matcher matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+ while (matcher.find()) {
+ String param = matcher.group();
+ int index;
+ try {
+ index = Integer.parseInt(param.substring(2, param.length() - 1).trim());
+ } catch (NumberFormatException e) {
+ throw new SemanticException("select into: the i of ${i} should be an integer.");
+ }
+ if (index < 1 || index >= sourceNodes.length) {
+ throw new SemanticException(
+ "select into: the i of ${i} should be greater than 0 and equal to or less than the length of queried path prefix.");
+ }
+ resNode = matcher.replaceFirst(sourceNodes[index]);
+ matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
+ }
+ return resNode;
+ }
+
+ public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions) {
+ for (Expression expression : expressions) {
+ if (!(expression instanceof TimeSeriesOperand)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java
new file mode 100644
index 0000000000..c0a0ec120e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoDeviceMeasurementDescriptor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_DEVICE_ERROR_MSG;
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoDeviceMeasurementDescriptor extends IntoPathDescriptor {
+
+ private final Map<PartialPath, PartialPath> targetDeviceToSourceDeviceMap;
+
+ public IntoDeviceMeasurementDescriptor() {
+ super();
+ this.targetDeviceToSourceDeviceMap = new HashMap<>();
+ }
+
+ public void specifyTargetDevice(PartialPath sourceDevice, PartialPath targetDevice) {
+ if (targetDeviceToSourceDeviceMap.containsKey(targetDevice)
+ && !targetDeviceToSourceDeviceMap.get(targetDevice).equals(sourceDevice)) {
+ throw new SemanticException(DUPLICATE_TARGET_DEVICE_ERROR_MSG);
+ }
+ targetDeviceToSourceDeviceMap.put(targetDevice, sourceDevice);
+ }
+
+ public void specifyTargetMeasurement(
+ PartialPath targetDevice, String sourceColumn, String targetMeasurement) {
+ Map<String, String> measurementToSourceColumnMap =
+ targetPathToSourceMap.computeIfAbsent(targetDevice, key -> new HashMap<>());
+ if (measurementToSourceColumnMap.containsKey(targetMeasurement)) {
+ throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+ }
+ measurementToSourceColumnMap.put(targetMeasurement, sourceColumn);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
new file mode 100644
index 0000000000..d39515dcf0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/IntoPathDescriptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.parameter;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.statement.component.IntoComponent.DUPLICATE_TARGET_PATH_ERROR_MSG;
+
+public class IntoPathDescriptor {
+
+ protected final Map<PartialPath, Map<String, String>> targetPathToSourceMap;
+ protected final Map<PartialPath, Boolean> deviceToAlignedMap;
+
+ public IntoPathDescriptor() {
+ this.targetPathToSourceMap = new HashMap<>();
+ this.deviceToAlignedMap = new HashMap<>();
+ }
+
+ public void specifyTargetPath(String sourceColumn, PartialPath targetPath) {
+ PartialPath targetDevice = targetPath.getDevicePath();
+ String targetMeasurement = targetPath.getMeasurement();
+
+ Map<String, String> measurementToSourceColumnMap =
+ targetPathToSourceMap.computeIfAbsent(targetDevice, key -> new HashMap<>());
+ if (measurementToSourceColumnMap.containsKey(targetMeasurement)) {
+ throw new SemanticException(DUPLICATE_TARGET_PATH_ERROR_MSG);
+ }
+ measurementToSourceColumnMap.put(targetMeasurement, sourceColumn);
+ }
+
+ public void specifyDeviceAlignment(PartialPath targetDevice, boolean isAligned) {
+ if (deviceToAlignedMap.containsKey(targetDevice)
+ && deviceToAlignedMap.get(targetDevice) != isAligned) {
+ throw new SemanticException(
+ "select into: alignment property must be the same for the same device.");
+ }
+ deviceToAlignedMap.put(targetDevice, isAligned);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
index c5848a3752..223e10602f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/IntoComponent.java
@@ -19,23 +19,37 @@
package org.apache.iotdb.db.mpp.plan.statement.component;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import java.util.List;
+import static org.apache.iotdb.db.mpp.plan.analyze.SelectIntoUtils.checkIsAllRawSeriesQuery;
+
/** This class maintains information of {@code INTO} clause. */
public class IntoComponent extends StatementNode {
+ public static String PLACEHOLDER_MISMATCH_ERROR_MSG =
+ "select into: the correspondence between the placeholder and the raw time series could not be established.";
+ public static String FORBID_PLACEHOLDER_ERROR_MSG =
+ "select into: placeholders can only be used in raw time series data queries.";
+ public static String DEVICE_NUM_MISMATCH_ERROR_MSG =
+ "select into: the number of source devices and the number of target devices should be the same.";
+ public static String PATH_NUM_MISMATCH_ERROR_MSG =
+ "select into: the number of source columns and the number of target paths should be the same.";
+ public static String DUPLICATE_TARGET_DEVICE_ERROR_MSG =
+ "select into: target devices in into clause should be different.";
+ public static String DUPLICATE_TARGET_PATH_ERROR_MSG =
+ "select into: target paths in into clause should be different.";
+
private final List<IntoItem> intoItems;
public IntoComponent(List<IntoItem> intoItems) {
this.intoItems = intoItems;
}
- public List<IntoItem> getIntoItems() {
- return intoItems;
- }
-
public boolean isDeviceExistPlaceholder() {
for (IntoItem intoItem : intoItems) {
if (intoItem.isDeviceExistPlaceholder()) {
@@ -53,4 +67,170 @@ public class IntoComponent extends StatementNode {
}
return false;
}
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // used in ALIGN BY TIME
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public void validate(List<Expression> sourceColumns) {
+ boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+ if (!isAllRawSeriesQuery) {
+ if (isDeviceExistPlaceholder() || isMeasurementsExistPlaceholder()) {
+ throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+ }
+ }
+
+ if (isMeasurementsExistPlaceholder()) {
+ for (IntoItem intoItem : intoItems) {
+ if (intoItem.getIntoMeasurements().size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ }
+
+ if (isDeviceExistPlaceholder()) {
+ if (intoItems.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoItems.size() != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ } else {
+ int intoPathsNum =
+ intoItems.stream().mapToInt(item -> item.getIntoMeasurements().size()).sum();
+ if (intoPathsNum != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ }
+
+ public IntoPathIterator getIntoPathIterator() {
+ return new IntoPathIterator(
+ intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+ }
+
+ public static class IntoPathIterator extends AbstractIntoIterator {
+
+ public IntoPathIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+ }
+
+ public void next() {
+ if (isMeasurementsExistPlaceholder) {
+ if (!isDeviceExistPlaceholder) {
+ deviceIndex++;
+ }
+ } else {
+ measurementIndex++;
+ if (measurementIndex == intoItems.get(deviceIndex).getIntoMeasurements().size()) {
+ deviceIndex++;
+ measurementIndex = 0;
+ }
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+ // used in ALIGN BY DEVICE
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ public void validate(List<PartialPath> sourceDevices, List<Expression> sourceColumns) {
+ boolean isAllRawSeriesQuery = checkIsAllRawSeriesQuery(sourceColumns);
+
+ if (!isAllRawSeriesQuery) {
+ if (isMeasurementsExistPlaceholder()) {
+ throw new SemanticException(FORBID_PLACEHOLDER_ERROR_MSG);
+ }
+ }
+
+ if (isDeviceExistPlaceholder()) {
+ if (intoItems.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoItems.size() != sourceDevices.size()) {
+ throw new SemanticException(DEVICE_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+
+ for (IntoItem intoItem : intoItems) {
+ List<String> intoMeasurements = intoItem.getIntoMeasurements();
+ if (intoItem.isMeasurementsExistPlaceholder()) {
+ if (intoMeasurements.size() != 1) {
+ throw new SemanticException(PLACEHOLDER_MISMATCH_ERROR_MSG);
+ }
+ } else {
+ if (intoMeasurements.size() != sourceColumns.size()) {
+ throw new SemanticException(PATH_NUM_MISMATCH_ERROR_MSG);
+ }
+ }
+ }
+ }
+
+ public IntoDeviceMeasurementIterator getIntoDeviceMeasurementIterator() {
+ return new IntoDeviceMeasurementIterator(
+ intoItems, isDeviceExistPlaceholder(), isMeasurementsExistPlaceholder());
+ }
+
+ public static class IntoDeviceMeasurementIterator extends AbstractIntoIterator {
+
+ public IntoDeviceMeasurementIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ super(intoItems, isDeviceExistPlaceholder, isMeasurementsExistPlaceholder);
+ }
+
+ public void nextDevice() {
+ if (!isDeviceExistPlaceholder) {
+ deviceIndex++;
+ measurementIndex = 0;
+ }
+ }
+
+ public void nextMeasurement() {
+ if (!intoItems.get(deviceIndex).isMeasurementsExistPlaceholder()) {
+ measurementIndex++;
+ }
+ }
+ }
+
+ public abstract static class AbstractIntoIterator {
+
+ protected final List<IntoItem> intoItems;
+
+ protected final boolean isDeviceExistPlaceholder;
+ protected final boolean isMeasurementsExistPlaceholder;
+
+ protected int deviceIndex;
+ protected int measurementIndex;
+
+ public AbstractIntoIterator(
+ List<IntoItem> intoItems,
+ boolean isDeviceExistPlaceholder,
+ boolean isMeasurementsExistPlaceholder) {
+ this.intoItems = intoItems;
+ this.isDeviceExistPlaceholder = isDeviceExistPlaceholder;
+ this.isMeasurementsExistPlaceholder = isMeasurementsExistPlaceholder;
+ this.deviceIndex = 0;
+ this.measurementIndex = 0;
+ }
+
+ public PartialPath getDeviceTemplate() {
+ return intoItems.get(deviceIndex).getIntoDevice();
+ }
+
+ public String getMeasurementTemplate() {
+ return intoItems.get(deviceIndex).getIntoMeasurements().get(measurementIndex);
+ }
+
+ public boolean isAlignedDevice() {
+ return intoItems.get(deviceIndex).isAligned();
+ }
+ }
}