You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 11:13:38 UTC

[iotdb] 01/01: merge master and fix conflict

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch optimizeDDP
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b90b203dbccde29b808e8f6b74064adcf486162b
Merge: e4a953b 5540272
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 19:13:02 2021 +0800

    merge master and fix conflict

 docs/SystemDesign/StorageEngine/WAL.md             |   6 +-
 docs/zh/SystemDesign/StorageEngine/WAL.md          |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../db/engine/merge/manage/MergeResource.java      |  27 ++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  26 +-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   4 +
 .../db/engine/storagegroup/TsFileResource.java     |  10 +
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  12 +
 .../storagegroup/timeindex/FileTimeIndex.java      |  12 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  16 +
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |  31 ++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 391 +++++++++++----------
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  55 +++
 .../apache/iotdb/db/metadata/mnode/MNodeTest.java  |  27 ++
 14 files changed, 432 insertions(+), 194 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 93f2af2,8332a88..c99f5dc
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@@ -723,6 -582,188 +581,181 @@@ public class PhysicalGenerator 
      return queryPlan;
    }
  
+   @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning
+   private QueryPlan getAlignQueryPlan(
+       QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan)
+       throws QueryProcessException {
+     // below is the core realization of ALIGN_BY_DEVICE sql logic
+     AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
+     if (queryPlan instanceof GroupByTimePlan) {
+       alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
+     } else if (queryPlan instanceof FillQueryPlan) {
+       alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
+     } else if (queryPlan instanceof AggregationPlan) {
+       if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+         throw new QueryProcessException("group by level does not support align by device now.");
+       }
+       alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
+     }
+ 
+     List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
+     // remove stars in fromPaths and get deviceId with deduplication
+     List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
+     List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
+     List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
+ 
+     // to record result measurement columns
+     List<String> measurements = new ArrayList<>();
+     Map<String, String> measurementAliasMap = new HashMap<>();
+     // to check the same measurement of different devices having the same datatype
+     // record the data type of each column of result set
+     Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
+     Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+ 
+     // to record the real type of the corresponding measurement
+     Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+     List<PartialPath> paths = new ArrayList<>();
+ 
+     for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
+       PartialPath suffixPath = suffixPaths.get(i);
+ 
+       // to record measurements in the loop of a suffix path
+       Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+ 
+       // if const measurement
+       if (suffixPath.getMeasurement().startsWith("'")) {
+         measurements.add(suffixPath.getMeasurement());
+         measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+         continue;
+       }
+ 
+       for (PartialPath device : devices) { // per device in FROM after deduplication
+ 
+         PartialPath fullPath = device.concatPath(suffixPath);
+         try {
+           // remove stars in SELECT to get actual paths
+           List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+           if (suffixPath.isTsAliasExists()) {
+             if (actualPaths.size() == 1) {
+               String columnName = actualPaths.get(0).getMeasurement();
+               if (originAggregations != null && !originAggregations.isEmpty()) {
+                 measurementAliasMap.put(
+                     originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
+               } else {
+                 measurementAliasMap.put(columnName, suffixPath.getTsAlias());
+               }
+             } else if (actualPaths.size() >= 2) {
+               throw new QueryProcessException(
+                   "alias '"
+                       + suffixPath.getTsAlias()
+                       + "' can only be matched with one time series");
+             }
+           }
+ 
+           // for actual non exist path
+           if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
+             String nonExistMeasurement = fullPath.getMeasurement();
+             if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+                 && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
+               measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+             }
+           }
+ 
+           // Get data types with and without aggregate functions (actual time series) respectively
+           // Data type with aggregation function `columnDataTypes` is used for:
+           //  1. Data type consistency check 2. Header calculation, output result set
+           // The actual data type of the time series `measurementDataTypes` is used for
+           //  the actual query in the AlignByDeviceDataSet
+           String aggregation =
+               originAggregations != null && !originAggregations.isEmpty()
+                   ? originAggregations.get(i)
+                   : null;
+ 
+           Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation);
+           List<TSDataType> columnDataTypes = pair.left;
+           List<TSDataType> measurementDataTypes = pair.right;
+           for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+             PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
+ 
+             // check datatype consistency
+             // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
+             // device,
+             // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+             String measurementChecked;
+             if (originAggregations != null && !originAggregations.isEmpty()) {
+               measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
+             } else {
+               measurementChecked = path.getMeasurement();
+             }
+             TSDataType columnDataType = columnDataTypes.get(pathIdx);
+             if (columnDataTypeMap.containsKey(measurementChecked)) {
+               if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
+                 throw new QueryProcessException(
+                     "The data types of the same measurement column should be the same across "
+                         + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+                         + "SQL document.");
+               }
+             } else {
+               columnDataTypeMap.put(measurementChecked, columnDataType);
+               measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
+             }
+ 
+             // This step indicates that the measurement exists under the device and is correct,
+             // First, update measurementSetOfGivenSuffix which is distinct
+             // Then if this measurement is recognized as NonExist before,update it to Exist
+             if (measurementSetOfGivenSuffix.add(measurementChecked)
+                 || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
+               measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+             }
+ 
+             // update paths
+             paths.add(path);
+           }
+ 
+         } catch (MetadataException e) {
+           throw new LogicalOptimizeException(
+               String.format(
+                       "Error when getting all paths of a full path: %s", fullPath.getFullPath())
+                   + e.getMessage());
+         }
+       }
+ 
+       // update measurements
+       // Note that in the loop of a suffix path, set is used.
+       // And across the loops of suffix paths, list is used.
+       // e.g. select *,s1 from root.sg.d0, root.sg.d1
+       // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+       // for suffix s1, measurementSetOfGivenSuffix = {s1}
+       // therefore the final measurements is [s1,s2,s3,s1].
+       measurements.addAll(measurementSetOfGivenSuffix);
+     }
+ 
+     // slimit trim on the measurementColumnList
+     if (queryOperator.hasSlimit()) {
+       int seriesSlimit = queryOperator.getSeriesLimit();
+       int seriesOffset = queryOperator.getSeriesOffset();
+       measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+     }
+ 
 -    int maxDeduplicatedPathNum =
 -        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
 -
 -    if (measurements.size() > maxDeduplicatedPathNum) {
 -      throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
 -    }
 -
+     // assigns to alignByDevicePlan
+     alignByDevicePlan.setMeasurements(measurements);
+     alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+     alignByDevicePlan.setDevices(devices);
+     alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
+     alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+     alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+     alignByDevicePlan.setPaths(paths);
+ 
+     // get deviceToFilterMap
+     FilterOperator filterOperator = queryOperator.getFilterOperator();
+     if (filterOperator != null) {
+       alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
+     }
+ 
+     queryPlan = alignByDevicePlan;
+     return queryPlan;
+   }
+ 
    // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
    // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
    //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]