You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/15 06:21:20 UTC
[iotdb] branch master updated: Fix refactor query code,
reduce complexity in PhysicalGenerator (#2820)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e35a3e4 Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)
e35a3e4 is described below
commit e35a3e48a7a5ab423e9a9f5cfb3ac7b01922f593
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Mon Mar 15 14:20:57 2021 +0800
Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)
---
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 405 +++++++++++----------
1 file changed, 219 insertions(+), 186 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 4fddd2d..8332a88 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -430,12 +430,17 @@ public class PhysicalGenerator {
return SchemaUtils.getSeriesTypesByPaths(paths);
}
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
- throws QueryProcessException {
- QueryPlan queryPlan;
+ interface Transfrom {
+ QueryPlan transform(QueryOperator queryOperator, int fetchSize) throws QueryProcessException;
+ }
- if (queryOperator.hasAggregation()) {
+ /** agg physical plan transform */
+ public static class AggPhysicalPlanRule implements Transfrom {
+
+ @Override
+ public QueryPlan transform(QueryOperator queryOperator, int fetchSize)
+ throws QueryProcessException {
+ QueryPlan queryPlan;
if (queryOperator.hasUdf()) {
throw new QueryProcessException(
"User-defined and built-in hybrid aggregation is not supported.");
@@ -481,7 +486,17 @@ public class PhysicalGenerator {
throw new QueryProcessException(e);
}
}
- } else if (queryOperator.isFill()) {
+ return queryPlan;
+ }
+ }
+
+ /** fill physical plan transfrom */
+ public static class FillPhysicalPlanRule implements Transfrom {
+
+ @Override
+ public QueryPlan transform(QueryOperator queryOperator, int fetchSize)
+ throws QueryProcessException {
+ QueryPlan queryPlan;
if (queryOperator.hasUdf()) {
throw new QueryProcessException("Fill functions are not supported in UDF queries.");
}
@@ -493,6 +508,19 @@ public class PhysicalGenerator {
long time = Long.parseLong(((BasicFunctionOperator) timeFilter).getValue());
((FillQueryPlan) queryPlan).setQueryTime(time);
((FillQueryPlan) queryPlan).setFillType(queryOperator.getFillTypes());
+ return queryPlan;
+ }
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+ private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
+ throws QueryProcessException {
+ QueryPlan queryPlan = null;
+
+ if (queryOperator.hasAggregation()) {
+ queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize);
+ } else if (queryOperator.isFill()) {
+ queryPlan = new FillPhysicalPlanRule().transform(queryOperator, fetchSize);
} else if (queryOperator.isLastQuery()) {
queryPlan = new LastQueryPlan();
} else if (queryOperator.getIndexType() != null) {
@@ -505,184 +533,7 @@ public class PhysicalGenerator {
}
if (queryOperator.isAlignByDevice()) {
- // below is the core realization of ALIGN_BY_DEVICE sql logic
- AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
- if (queryPlan instanceof GroupByTimePlan) {
- alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
- } else if (queryPlan instanceof FillQueryPlan) {
- alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
- } else if (queryPlan instanceof AggregationPlan) {
- if (((AggregationPlan) queryPlan).getLevel() >= 0) {
- throw new QueryProcessException("group by level does not support align by device now.");
- }
- alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
- }
-
- List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
- // remove stars in fromPaths and get deviceId with deduplication
- List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
- List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
- List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
-
- // to record result measurement columns
- List<String> measurements = new ArrayList<>();
- Map<String, String> measurementAliasMap = new HashMap<>();
- // to check the same measurement of different devices having the same datatype
- // record the data type of each column of result set
- Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
- Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
-
- // to record the real type of the corresponding measurement
- Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
- List<PartialPath> paths = new ArrayList<>();
-
- for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
- PartialPath suffixPath = suffixPaths.get(i);
-
- // to record measurements in the loop of a suffix path
- Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
-
- // if const measurement
- if (suffixPath.getMeasurement().startsWith("'")) {
- measurements.add(suffixPath.getMeasurement());
- measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
- continue;
- }
-
- for (PartialPath device : devices) { // per device in FROM after deduplication
-
- PartialPath fullPath = device.concatPath(suffixPath);
- try {
- // remove stars in SELECT to get actual paths
- List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
- if (suffixPath.isTsAliasExists()) {
- if (actualPaths.size() == 1) {
- String columnName = actualPaths.get(0).getMeasurement();
- if (originAggregations != null && !originAggregations.isEmpty()) {
- measurementAliasMap.put(
- originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
- } else {
- measurementAliasMap.put(columnName, suffixPath.getTsAlias());
- }
- } else if (actualPaths.size() >= 2) {
- throw new QueryProcessException(
- "alias '"
- + suffixPath.getTsAlias()
- + "' can only be matched with one time series");
- }
- }
-
- // for actual non exist path
- if (originAggregations != null
- && actualPaths.isEmpty()
- && originAggregations.isEmpty()) {
- String nonExistMeasurement = fullPath.getMeasurement();
- if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
- && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
- measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
- }
- }
-
- // Get data types with and without aggregate functions (actual time series) respectively
- // Data type with aggregation function `columnDataTypes` is used for:
- // 1. Data type consistency check 2. Header calculation, output result set
- // The actual data type of the time series `measurementDataTypes` is used for
- // the actual query in the AlignByDeviceDataSet
- String aggregation =
- originAggregations != null && !originAggregations.isEmpty()
- ? originAggregations.get(i)
- : null;
-
- Pair<List<TSDataType>, List<TSDataType>> pair =
- getSeriesTypes(actualPaths, aggregation);
- List<TSDataType> columnDataTypes = pair.left;
- List<TSDataType> measurementDataTypes = pair.right;
- for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
- PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
-
- // check datatype consistency
- // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
- // device,
- // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
- String measurementChecked;
- if (originAggregations != null && !originAggregations.isEmpty()) {
- measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
- } else {
- measurementChecked = path.getMeasurement();
- }
- TSDataType columnDataType = columnDataTypes.get(pathIdx);
- if (columnDataTypeMap.containsKey(measurementChecked)) {
- if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
- throw new QueryProcessException(
- "The data types of the same measurement column should be the same across "
- + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
- + "SQL document.");
- }
- } else {
- columnDataTypeMap.put(measurementChecked, columnDataType);
- measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
- }
-
- // This step indicates that the measurement exists under the device and is correct,
- // First, update measurementSetOfGivenSuffix which is distinct
- // Then if this measurement is recognized as NonExist before,update it to Exist
- if (measurementSetOfGivenSuffix.add(measurementChecked)
- || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
- measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
- }
-
- // update paths
- paths.add(path);
- }
-
- } catch (MetadataException e) {
- throw new LogicalOptimizeException(
- String.format(
- "Error when getting all paths of a full path: %s", fullPath.getFullPath())
- + e.getMessage());
- }
- }
-
- // update measurements
- // Note that in the loop of a suffix path, set is used.
- // And across the loops of suffix paths, list is used.
- // e.g. select *,s1 from root.sg.d0, root.sg.d1
- // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
- // for suffix s1, measurementSetOfGivenSuffix = {s1}
- // therefore the final measurements is [s1,s2,s3,s1].
- measurements.addAll(measurementSetOfGivenSuffix);
- }
-
- // slimit trim on the measurementColumnList
- if (queryOperator.hasSlimit()) {
- int seriesSlimit = queryOperator.getSeriesLimit();
- int seriesOffset = queryOperator.getSeriesOffset();
- measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
- }
-
- int maxDeduplicatedPathNum =
- QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
- if (measurements.size() > maxDeduplicatedPathNum) {
- throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
- }
-
- // assigns to alignByDevicePlan
- alignByDevicePlan.setMeasurements(measurements);
- alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
- alignByDevicePlan.setDevices(devices);
- alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
- alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
- alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
- alignByDevicePlan.setPaths(paths);
-
- // get deviceToFilterMap
- FilterOperator filterOperator = queryOperator.getFilterOperator();
- if (filterOperator != null) {
- alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
- }
-
- queryPlan = alignByDevicePlan;
+ queryPlan = getAlignQueryPlan(queryOperator, fetchSize, queryPlan);
} else {
queryPlan.setPaths(queryOperator.getSelectedPaths());
// Last query result set will not be affected by alignment
@@ -731,6 +582,188 @@ public class PhysicalGenerator {
return queryPlan;
}
+ @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning
+ private QueryPlan getAlignQueryPlan(
+ QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan)
+ throws QueryProcessException {
+ // below is the core realization of ALIGN_BY_DEVICE sql logic
+ AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
+ if (queryPlan instanceof GroupByTimePlan) {
+ alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
+ } else if (queryPlan instanceof FillQueryPlan) {
+ alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
+ } else if (queryPlan instanceof AggregationPlan) {
+ if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+ throw new QueryProcessException("group by level does not support align by device now.");
+ }
+ alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
+ }
+
+ List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
+ // remove stars in fromPaths and get deviceId with deduplication
+ List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
+ List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
+ List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
+
+ // to record result measurement columns
+ List<String> measurements = new ArrayList<>();
+ Map<String, String> measurementAliasMap = new HashMap<>();
+ // to check the same measurement of different devices having the same datatype
+ // record the data type of each column of result set
+ Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
+ Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+
+ // to record the real type of the corresponding measurement
+ Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+ List<PartialPath> paths = new ArrayList<>();
+
+ for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
+ PartialPath suffixPath = suffixPaths.get(i);
+
+ // to record measurements in the loop of a suffix path
+ Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+ // if const measurement
+ if (suffixPath.getMeasurement().startsWith("'")) {
+ measurements.add(suffixPath.getMeasurement());
+ measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+ continue;
+ }
+
+ for (PartialPath device : devices) { // per device in FROM after deduplication
+
+ PartialPath fullPath = device.concatPath(suffixPath);
+ try {
+ // remove stars in SELECT to get actual paths
+ List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+ if (suffixPath.isTsAliasExists()) {
+ if (actualPaths.size() == 1) {
+ String columnName = actualPaths.get(0).getMeasurement();
+ if (originAggregations != null && !originAggregations.isEmpty()) {
+ measurementAliasMap.put(
+ originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
+ } else {
+ measurementAliasMap.put(columnName, suffixPath.getTsAlias());
+ }
+ } else if (actualPaths.size() >= 2) {
+ throw new QueryProcessException(
+ "alias '"
+ + suffixPath.getTsAlias()
+ + "' can only be matched with one time series");
+ }
+ }
+
+ // for actual non exist path
+ if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
+ String nonExistMeasurement = fullPath.getMeasurement();
+ if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+ && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
+ measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+ }
+ }
+
+ // Get data types with and without aggregate functions (actual time series) respectively
+ // Data type with aggregation function `columnDataTypes` is used for:
+ // 1. Data type consistency check 2. Header calculation, output result set
+ // The actual data type of the time series `measurementDataTypes` is used for
+ // the actual query in the AlignByDeviceDataSet
+ String aggregation =
+ originAggregations != null && !originAggregations.isEmpty()
+ ? originAggregations.get(i)
+ : null;
+
+ Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation);
+ List<TSDataType> columnDataTypes = pair.left;
+ List<TSDataType> measurementDataTypes = pair.right;
+ for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+ PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
+
+ // check datatype consistency
+ // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
+ // device,
+ // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+ String measurementChecked;
+ if (originAggregations != null && !originAggregations.isEmpty()) {
+ measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
+ } else {
+ measurementChecked = path.getMeasurement();
+ }
+ TSDataType columnDataType = columnDataTypes.get(pathIdx);
+ if (columnDataTypeMap.containsKey(measurementChecked)) {
+ if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
+ throw new QueryProcessException(
+ "The data types of the same measurement column should be the same across "
+ + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+ + "SQL document.");
+ }
+ } else {
+ columnDataTypeMap.put(measurementChecked, columnDataType);
+ measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
+ }
+
+ // This step indicates that the measurement exists under the device and is correct,
+ // First, update measurementSetOfGivenSuffix which is distinct
+ // Then if this measurement is recognized as NonExist before,update it to Exist
+ if (measurementSetOfGivenSuffix.add(measurementChecked)
+ || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
+ measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+ }
+
+ // update paths
+ paths.add(path);
+ }
+
+ } catch (MetadataException e) {
+ throw new LogicalOptimizeException(
+ String.format(
+ "Error when getting all paths of a full path: %s", fullPath.getFullPath())
+ + e.getMessage());
+ }
+ }
+
+ // update measurements
+ // Note that in the loop of a suffix path, set is used.
+ // And across the loops of suffix paths, list is used.
+ // e.g. select *,s1 from root.sg.d0, root.sg.d1
+ // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+ // for suffix s1, measurementSetOfGivenSuffix = {s1}
+ // therefore the final measurements is [s1,s2,s3,s1].
+ measurements.addAll(measurementSetOfGivenSuffix);
+ }
+
+ // slimit trim on the measurementColumnList
+ if (queryOperator.hasSlimit()) {
+ int seriesSlimit = queryOperator.getSeriesLimit();
+ int seriesOffset = queryOperator.getSeriesOffset();
+ measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+ }
+
+ int maxDeduplicatedPathNum =
+ QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
+
+ if (measurements.size() > maxDeduplicatedPathNum) {
+ throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
+ }
+
+ // assigns to alignByDevicePlan
+ alignByDevicePlan.setMeasurements(measurements);
+ alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+ alignByDevicePlan.setDevices(devices);
+ alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
+ alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+ alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+ alignByDevicePlan.setPaths(paths);
+
+ // get deviceToFilterMap
+ FilterOperator filterOperator = queryOperator.getFilterOperator();
+ if (filterOperator != null) {
+ alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
+ }
+
+ queryPlan = alignByDevicePlan;
+ return queryPlan;
+ }
+
// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
@@ -956,7 +989,7 @@ public class PhysicalGenerator {
return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
}
- private boolean verifyAllAggregationDataTypesEqual(QueryOperator queryOperator)
+ private static boolean verifyAllAggregationDataTypesEqual(QueryOperator queryOperator)
throws MetadataException {
List<String> aggregations = queryOperator.getSelectOperator().getAggregations();
if (aggregations.isEmpty()) {
@@ -964,7 +997,7 @@ public class PhysicalGenerator {
}
List<PartialPath> paths = queryOperator.getSelectedPaths();
- List<TSDataType> dataTypes = getSeriesTypes(paths);
+ List<TSDataType> dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
String aggType = aggregations.get(0);
switch (aggType) {
case SQLConstant.MIN_VALUE: