You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/03/15 06:21:20 UTC

[iotdb] branch master updated: Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e35a3e4  Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)
e35a3e4 is described below

commit e35a3e48a7a5ab423e9a9f5cfb3ac7b01922f593
Author: wangchao316 <66...@users.noreply.github.com>
AuthorDate: Mon Mar 15 14:20:57 2021 +0800

    Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)
---
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 405 +++++++++++----------
 1 file changed, 219 insertions(+), 186 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 4fddd2d..8332a88 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -430,12 +430,17 @@ public class PhysicalGenerator {
     return SchemaUtils.getSeriesTypesByPaths(paths);
   }
 
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
-      throws QueryProcessException {
-    QueryPlan queryPlan;
+  interface Transfrom {
+    QueryPlan transform(QueryOperator queryOperator, int fetchSize) throws QueryProcessException;
+  }
 
-    if (queryOperator.hasAggregation()) {
+  /** agg physical plan transform */
+  public static class AggPhysicalPlanRule implements Transfrom {
+
+    @Override
+    public QueryPlan transform(QueryOperator queryOperator, int fetchSize)
+        throws QueryProcessException {
+      QueryPlan queryPlan;
       if (queryOperator.hasUdf()) {
         throw new QueryProcessException(
             "User-defined and built-in hybrid aggregation is not supported.");
@@ -481,7 +486,17 @@ public class PhysicalGenerator {
           throw new QueryProcessException(e);
         }
       }
-    } else if (queryOperator.isFill()) {
+      return queryPlan;
+    }
+  }
+
+  /** fill physical plan transfrom */
+  public static class FillPhysicalPlanRule implements Transfrom {
+
+    @Override
+    public QueryPlan transform(QueryOperator queryOperator, int fetchSize)
+        throws QueryProcessException {
+      QueryPlan queryPlan;
       if (queryOperator.hasUdf()) {
         throw new QueryProcessException("Fill functions are not supported in UDF queries.");
       }
@@ -493,6 +508,19 @@ public class PhysicalGenerator {
       long time = Long.parseLong(((BasicFunctionOperator) timeFilter).getValue());
       ((FillQueryPlan) queryPlan).setQueryTime(time);
       ((FillQueryPlan) queryPlan).setFillType(queryOperator.getFillTypes());
+      return queryPlan;
+    }
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize)
+      throws QueryProcessException {
+    QueryPlan queryPlan = null;
+
+    if (queryOperator.hasAggregation()) {
+      queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize);
+    } else if (queryOperator.isFill()) {
+      queryPlan = new FillPhysicalPlanRule().transform(queryOperator, fetchSize);
     } else if (queryOperator.isLastQuery()) {
       queryPlan = new LastQueryPlan();
     } else if (queryOperator.getIndexType() != null) {
@@ -505,184 +533,7 @@ public class PhysicalGenerator {
     }
 
     if (queryOperator.isAlignByDevice()) {
-      // below is the core realization of ALIGN_BY_DEVICE sql logic
-      AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
-      if (queryPlan instanceof GroupByTimePlan) {
-        alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
-      } else if (queryPlan instanceof FillQueryPlan) {
-        alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
-      } else if (queryPlan instanceof AggregationPlan) {
-        if (((AggregationPlan) queryPlan).getLevel() >= 0) {
-          throw new QueryProcessException("group by level does not support align by device now.");
-        }
-        alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
-      }
-
-      List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
-      // remove stars in fromPaths and get deviceId with deduplication
-      List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
-      List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
-      List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
-
-      // to record result measurement columns
-      List<String> measurements = new ArrayList<>();
-      Map<String, String> measurementAliasMap = new HashMap<>();
-      // to check the same measurement of different devices having the same datatype
-      // record the data type of each column of result set
-      Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
-      Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
-
-      // to record the real type of the corresponding measurement
-      Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
-      List<PartialPath> paths = new ArrayList<>();
-
-      for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
-        PartialPath suffixPath = suffixPaths.get(i);
-
-        // to record measurements in the loop of a suffix path
-        Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
-
-        // if const measurement
-        if (suffixPath.getMeasurement().startsWith("'")) {
-          measurements.add(suffixPath.getMeasurement());
-          measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
-          continue;
-        }
-
-        for (PartialPath device : devices) { // per device in FROM after deduplication
-
-          PartialPath fullPath = device.concatPath(suffixPath);
-          try {
-            // remove stars in SELECT to get actual paths
-            List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
-            if (suffixPath.isTsAliasExists()) {
-              if (actualPaths.size() == 1) {
-                String columnName = actualPaths.get(0).getMeasurement();
-                if (originAggregations != null && !originAggregations.isEmpty()) {
-                  measurementAliasMap.put(
-                      originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
-                } else {
-                  measurementAliasMap.put(columnName, suffixPath.getTsAlias());
-                }
-              } else if (actualPaths.size() >= 2) {
-                throw new QueryProcessException(
-                    "alias '"
-                        + suffixPath.getTsAlias()
-                        + "' can only be matched with one time series");
-              }
-            }
-
-            // for actual non exist path
-            if (originAggregations != null
-                && actualPaths.isEmpty()
-                && originAggregations.isEmpty()) {
-              String nonExistMeasurement = fullPath.getMeasurement();
-              if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
-                  && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
-                measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
-              }
-            }
-
-            // Get data types with and without aggregate functions (actual time series) respectively
-            // Data type with aggregation function `columnDataTypes` is used for:
-            //  1. Data type consistency check 2. Header calculation, output result set
-            // The actual data type of the time series `measurementDataTypes` is used for
-            //  the actual query in the AlignByDeviceDataSet
-            String aggregation =
-                originAggregations != null && !originAggregations.isEmpty()
-                    ? originAggregations.get(i)
-                    : null;
-
-            Pair<List<TSDataType>, List<TSDataType>> pair =
-                getSeriesTypes(actualPaths, aggregation);
-            List<TSDataType> columnDataTypes = pair.left;
-            List<TSDataType> measurementDataTypes = pair.right;
-            for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
-              PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
-
-              // check datatype consistency
-              // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
-              // device,
-              // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
-              String measurementChecked;
-              if (originAggregations != null && !originAggregations.isEmpty()) {
-                measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
-              } else {
-                measurementChecked = path.getMeasurement();
-              }
-              TSDataType columnDataType = columnDataTypes.get(pathIdx);
-              if (columnDataTypeMap.containsKey(measurementChecked)) {
-                if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
-                  throw new QueryProcessException(
-                      "The data types of the same measurement column should be the same across "
-                          + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
-                          + "SQL document.");
-                }
-              } else {
-                columnDataTypeMap.put(measurementChecked, columnDataType);
-                measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
-              }
-
-              // This step indicates that the measurement exists under the device and is correct,
-              // First, update measurementSetOfGivenSuffix which is distinct
-              // Then if this measurement is recognized as NonExist before,update it to Exist
-              if (measurementSetOfGivenSuffix.add(measurementChecked)
-                  || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
-                measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
-              }
-
-              // update paths
-              paths.add(path);
-            }
-
-          } catch (MetadataException e) {
-            throw new LogicalOptimizeException(
-                String.format(
-                        "Error when getting all paths of a full path: %s", fullPath.getFullPath())
-                    + e.getMessage());
-          }
-        }
-
-        // update measurements
-        // Note that in the loop of a suffix path, set is used.
-        // And across the loops of suffix paths, list is used.
-        // e.g. select *,s1 from root.sg.d0, root.sg.d1
-        // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
-        // for suffix s1, measurementSetOfGivenSuffix = {s1}
-        // therefore the final measurements is [s1,s2,s3,s1].
-        measurements.addAll(measurementSetOfGivenSuffix);
-      }
-
-      // slimit trim on the measurementColumnList
-      if (queryOperator.hasSlimit()) {
-        int seriesSlimit = queryOperator.getSeriesLimit();
-        int seriesOffset = queryOperator.getSeriesOffset();
-        measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
-      }
-
-      int maxDeduplicatedPathNum =
-          QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
-      if (measurements.size() > maxDeduplicatedPathNum) {
-        throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
-      }
-
-      // assigns to alignByDevicePlan
-      alignByDevicePlan.setMeasurements(measurements);
-      alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
-      alignByDevicePlan.setDevices(devices);
-      alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
-      alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
-      alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
-      alignByDevicePlan.setPaths(paths);
-
-      // get deviceToFilterMap
-      FilterOperator filterOperator = queryOperator.getFilterOperator();
-      if (filterOperator != null) {
-        alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
-      }
-
-      queryPlan = alignByDevicePlan;
+      queryPlan = getAlignQueryPlan(queryOperator, fetchSize, queryPlan);
     } else {
       queryPlan.setPaths(queryOperator.getSelectedPaths());
       // Last query result set will not be affected by alignment
@@ -731,6 +582,188 @@ public class PhysicalGenerator {
     return queryPlan;
   }
 
+  @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning
+  private QueryPlan getAlignQueryPlan(
+      QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan)
+      throws QueryProcessException {
+    // below is the core realization of ALIGN_BY_DEVICE sql logic
+    AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
+    if (queryPlan instanceof GroupByTimePlan) {
+      alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
+    } else if (queryPlan instanceof FillQueryPlan) {
+      alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
+    } else if (queryPlan instanceof AggregationPlan) {
+      if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+        throw new QueryProcessException("group by level does not support align by device now.");
+      }
+      alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
+    }
+
+    List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
+    // remove stars in fromPaths and get deviceId with deduplication
+    List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
+    List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
+    List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
+
+    // to record result measurement columns
+    List<String> measurements = new ArrayList<>();
+    Map<String, String> measurementAliasMap = new HashMap<>();
+    // to check the same measurement of different devices having the same datatype
+    // record the data type of each column of result set
+    Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
+    Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+
+    // to record the real type of the corresponding measurement
+    Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+    List<PartialPath> paths = new ArrayList<>();
+
+    for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
+      PartialPath suffixPath = suffixPaths.get(i);
+
+      // to record measurements in the loop of a suffix path
+      Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+      // if const measurement
+      if (suffixPath.getMeasurement().startsWith("'")) {
+        measurements.add(suffixPath.getMeasurement());
+        measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+        continue;
+      }
+
+      for (PartialPath device : devices) { // per device in FROM after deduplication
+
+        PartialPath fullPath = device.concatPath(suffixPath);
+        try {
+          // remove stars in SELECT to get actual paths
+          List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+          if (suffixPath.isTsAliasExists()) {
+            if (actualPaths.size() == 1) {
+              String columnName = actualPaths.get(0).getMeasurement();
+              if (originAggregations != null && !originAggregations.isEmpty()) {
+                measurementAliasMap.put(
+                    originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
+              } else {
+                measurementAliasMap.put(columnName, suffixPath.getTsAlias());
+              }
+            } else if (actualPaths.size() >= 2) {
+              throw new QueryProcessException(
+                  "alias '"
+                      + suffixPath.getTsAlias()
+                      + "' can only be matched with one time series");
+            }
+          }
+
+          // for actual non exist path
+          if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
+            String nonExistMeasurement = fullPath.getMeasurement();
+            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+                && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
+              measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+            }
+          }
+
+          // Get data types with and without aggregate functions (actual time series) respectively
+          // Data type with aggregation function `columnDataTypes` is used for:
+          //  1. Data type consistency check 2. Header calculation, output result set
+          // The actual data type of the time series `measurementDataTypes` is used for
+          //  the actual query in the AlignByDeviceDataSet
+          String aggregation =
+              originAggregations != null && !originAggregations.isEmpty()
+                  ? originAggregations.get(i)
+                  : null;
+
+          Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation);
+          List<TSDataType> columnDataTypes = pair.left;
+          List<TSDataType> measurementDataTypes = pair.right;
+          for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+            PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
+
+            // check datatype consistency
+            // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
+            // device,
+            // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+            String measurementChecked;
+            if (originAggregations != null && !originAggregations.isEmpty()) {
+              measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
+            } else {
+              measurementChecked = path.getMeasurement();
+            }
+            TSDataType columnDataType = columnDataTypes.get(pathIdx);
+            if (columnDataTypeMap.containsKey(measurementChecked)) {
+              if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
+                throw new QueryProcessException(
+                    "The data types of the same measurement column should be the same across "
+                        + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+                        + "SQL document.");
+              }
+            } else {
+              columnDataTypeMap.put(measurementChecked, columnDataType);
+              measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
+            }
+
+            // This step indicates that the measurement exists under the device and is correct,
+            // First, update measurementSetOfGivenSuffix which is distinct
+            // Then if this measurement is recognized as NonExist before,update it to Exist
+            if (measurementSetOfGivenSuffix.add(measurementChecked)
+                || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
+              measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+            }
+
+            // update paths
+            paths.add(path);
+          }
+
+        } catch (MetadataException e) {
+          throw new LogicalOptimizeException(
+              String.format(
+                      "Error when getting all paths of a full path: %s", fullPath.getFullPath())
+                  + e.getMessage());
+        }
+      }
+
+      // update measurements
+      // Note that in the loop of a suffix path, set is used.
+      // And across the loops of suffix paths, list is used.
+      // e.g. select *,s1 from root.sg.d0, root.sg.d1
+      // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+      // for suffix s1, measurementSetOfGivenSuffix = {s1}
+      // therefore the final measurements is [s1,s2,s3,s1].
+      measurements.addAll(measurementSetOfGivenSuffix);
+    }
+
+    // slimit trim on the measurementColumnList
+    if (queryOperator.hasSlimit()) {
+      int seriesSlimit = queryOperator.getSeriesLimit();
+      int seriesOffset = queryOperator.getSeriesOffset();
+      measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+    }
+
+    int maxDeduplicatedPathNum =
+        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
+
+    if (measurements.size() > maxDeduplicatedPathNum) {
+      throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
+    }
+
+    // assigns to alignByDevicePlan
+    alignByDevicePlan.setMeasurements(measurements);
+    alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+    alignByDevicePlan.setDevices(devices);
+    alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
+    alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+    alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+    alignByDevicePlan.setPaths(paths);
+
+    // get deviceToFilterMap
+    FilterOperator filterOperator = queryOperator.getFilterOperator();
+    if (filterOperator != null) {
+      alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
+    }
+
+    queryPlan = alignByDevicePlan;
+    return queryPlan;
+  }
+
   // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
   // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
   //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
@@ -956,7 +989,7 @@ public class PhysicalGenerator {
     return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
   }
 
-  private boolean verifyAllAggregationDataTypesEqual(QueryOperator queryOperator)
+  private static boolean verifyAllAggregationDataTypesEqual(QueryOperator queryOperator)
       throws MetadataException {
     List<String> aggregations = queryOperator.getSelectOperator().getAggregations();
     if (aggregations.isEmpty()) {
@@ -964,7 +997,7 @@ public class PhysicalGenerator {
     }
 
     List<PartialPath> paths = queryOperator.getSelectedPaths();
-    List<TSDataType> dataTypes = getSeriesTypes(paths);
+    List<TSDataType> dataTypes = SchemaUtils.getSeriesTypesByPaths(paths);
     String aggType = aggregations.get(0);
     switch (aggType) {
       case SQLConstant.MIN_VALUE: