You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 11:13:37 UTC

[iotdb] branch optimizeDDP updated (e4a953b -> b90b203)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch optimizeDDP
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from e4a953b  optimize the structure of duduplicate()
     add 4f60e92  [IOTDB-1187] Fix unseq compaction loss data bug after delete operation (#2785)
     add e35a3e4  Fix refactor query code, reduce complexity in PhysicalGenerator (#2820)
     add daa50d7  optimize the doc that WAL has three ways to be flashed to disk (#2822)
     add 5540272  [ISSUE-2827] add an overrided addChild() method in MNode. (#2828)
     new b90b203  merge master and fix conflict

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/SystemDesign/StorageEngine/WAL.md             |   6 +-
 docs/zh/SystemDesign/StorageEngine/WAL.md          |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../db/engine/merge/manage/MergeResource.java      |  27 ++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  26 +-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   4 +
 .../db/engine/storagegroup/TsFileResource.java     |  10 +
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  12 +
 .../storagegroup/timeindex/FileTimeIndex.java      |  12 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  16 +
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |  31 ++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 391 +++++++++++----------
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  55 +++
 .../apache/iotdb/db/metadata/mnode/MNodeTest.java  |  27 ++
 14 files changed, 432 insertions(+), 194 deletions(-)


[iotdb] 01/01: merge master and fix conflict

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch optimizeDDP
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b90b203dbccde29b808e8f6b74064adcf486162b
Merge: e4a953b 5540272
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 19:13:02 2021 +0800

    merge master and fix conflict

 docs/SystemDesign/StorageEngine/WAL.md             |   6 +-
 docs/zh/SystemDesign/StorageEngine/WAL.md          |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../db/engine/merge/manage/MergeResource.java      |  27 ++
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  26 +-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   4 +
 .../db/engine/storagegroup/TsFileResource.java     |  10 +
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  12 +
 .../storagegroup/timeindex/FileTimeIndex.java      |  12 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  16 +
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |  31 ++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 391 +++++++++++----------
 .../iotdb/db/engine/merge/MergeTaskTest.java       |  55 +++
 .../apache/iotdb/db/metadata/mnode/MNodeTest.java  |  27 ++
 14 files changed, 432 insertions(+), 194 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 93f2af2,8332a88..c99f5dc
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@@ -723,6 -582,188 +581,181 @@@ public class PhysicalGenerator 
      return queryPlan;
    }
  
+   @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning
+   private QueryPlan getAlignQueryPlan(
+       QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan)
+       throws QueryProcessException {
+     // below is the core realization of ALIGN_BY_DEVICE sql logic
+     AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
+     if (queryPlan instanceof GroupByTimePlan) {
+       alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
+     } else if (queryPlan instanceof FillQueryPlan) {
+       alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
+     } else if (queryPlan instanceof AggregationPlan) {
+       if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+         throw new QueryProcessException("group by level does not support align by device now.");
+       }
+       alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
+     }
+ 
+     List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
+     // remove stars in fromPaths and get deviceId with deduplication
+     List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
+     List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
+     List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
+ 
+     // to record result measurement columns
+     List<String> measurements = new ArrayList<>();
+     Map<String, String> measurementAliasMap = new HashMap<>();
+     // to check the same measurement of different devices having the same datatype
+     // record the data type of each column of result set
+     Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
+     Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+ 
+     // to record the real type of the corresponding measurement
+     Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+     List<PartialPath> paths = new ArrayList<>();
+ 
+     for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
+       PartialPath suffixPath = suffixPaths.get(i);
+ 
+       // to record measurements in the loop of a suffix path
+       Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+ 
+       // if const measurement
+       if (suffixPath.getMeasurement().startsWith("'")) {
+         measurements.add(suffixPath.getMeasurement());
+         measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+         continue;
+       }
+ 
+       for (PartialPath device : devices) { // per device in FROM after deduplication
+ 
+         PartialPath fullPath = device.concatPath(suffixPath);
+         try {
+           // remove stars in SELECT to get actual paths
+           List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+           if (suffixPath.isTsAliasExists()) {
+             if (actualPaths.size() == 1) {
+               String columnName = actualPaths.get(0).getMeasurement();
+               if (originAggregations != null && !originAggregations.isEmpty()) {
+                 measurementAliasMap.put(
+                     originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
+               } else {
+                 measurementAliasMap.put(columnName, suffixPath.getTsAlias());
+               }
+             } else if (actualPaths.size() >= 2) {
+               throw new QueryProcessException(
+                   "alias '"
+                       + suffixPath.getTsAlias()
+                       + "' can only be matched with one time series");
+             }
+           }
+ 
+           // for actual non exist path
+           if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
+             String nonExistMeasurement = fullPath.getMeasurement();
+             if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+                 && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
+               measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+             }
+           }
+ 
+           // Get data types with and without aggregate functions (actual time series) respectively
+           // Data type with aggregation function `columnDataTypes` is used for:
+           //  1. Data type consistency check 2. Header calculation, output result set
+           // The actual data type of the time series `measurementDataTypes` is used for
+           //  the actual query in the AlignByDeviceDataSet
+           String aggregation =
+               originAggregations != null && !originAggregations.isEmpty()
+                   ? originAggregations.get(i)
+                   : null;
+ 
+           Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation);
+           List<TSDataType> columnDataTypes = pair.left;
+           List<TSDataType> measurementDataTypes = pair.right;
+           for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+             PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
+ 
+             // check datatype consistency
+             // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
+             // device,
+             // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+             String measurementChecked;
+             if (originAggregations != null && !originAggregations.isEmpty()) {
+               measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
+             } else {
+               measurementChecked = path.getMeasurement();
+             }
+             TSDataType columnDataType = columnDataTypes.get(pathIdx);
+             if (columnDataTypeMap.containsKey(measurementChecked)) {
+               if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
+                 throw new QueryProcessException(
+                     "The data types of the same measurement column should be the same across "
+                         + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+                         + "SQL document.");
+               }
+             } else {
+               columnDataTypeMap.put(measurementChecked, columnDataType);
+               measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
+             }
+ 
+             // This step indicates that the measurement exists under the device and is correct,
+             // First, update measurementSetOfGivenSuffix which is distinct
+             // Then if this measurement is recognized as NonExist before,update it to Exist
+             if (measurementSetOfGivenSuffix.add(measurementChecked)
+                 || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
+               measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+             }
+ 
+             // update paths
+             paths.add(path);
+           }
+ 
+         } catch (MetadataException e) {
+           throw new LogicalOptimizeException(
+               String.format(
+                       "Error when getting all paths of a full path: %s", fullPath.getFullPath())
+                   + e.getMessage());
+         }
+       }
+ 
+       // update measurements
+       // Note that in the loop of a suffix path, set is used.
+       // And across the loops of suffix paths, list is used.
+       // e.g. select *,s1 from root.sg.d0, root.sg.d1
+       // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+       // for suffix s1, measurementSetOfGivenSuffix = {s1}
+       // therefore the final measurements is [s1,s2,s3,s1].
+       measurements.addAll(measurementSetOfGivenSuffix);
+     }
+ 
+     // slimit trim on the measurementColumnList
+     if (queryOperator.hasSlimit()) {
+       int seriesSlimit = queryOperator.getSeriesLimit();
+       int seriesOffset = queryOperator.getSeriesOffset();
+       measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+     }
+ 
 -    int maxDeduplicatedPathNum =
 -        QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
 -
 -    if (measurements.size() > maxDeduplicatedPathNum) {
 -      throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
 -    }
 -
+     // assigns to alignByDevicePlan
+     alignByDevicePlan.setMeasurements(measurements);
+     alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+     alignByDevicePlan.setDevices(devices);
+     alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
+     alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+     alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+     alignByDevicePlan.setPaths(paths);
+ 
+     // get deviceToFilterMap
+     FilterOperator filterOperator = queryOperator.getFilterOperator();
+     if (filterOperator != null) {
+       alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
+     }
+ 
+     queryPlan = alignByDevicePlan;
+     return queryPlan;
+   }
+ 
    // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
    // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
    //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]