You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/03/16 11:13:38 UTC
[iotdb] 01/01: merge master and fix conflict
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch optimizeDDP
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b90b203dbccde29b808e8f6b74064adcf486162b
Merge: e4a953b 5540272
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue Mar 16 19:13:02 2021 +0800
merge master and fix conflict
docs/SystemDesign/StorageEngine/WAL.md | 6 +-
docs/zh/SystemDesign/StorageEngine/WAL.md | 7 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../db/engine/merge/manage/MergeResource.java | 27 ++
.../iotdb/db/engine/merge/task/MergeFileTask.java | 26 +-
.../db/engine/merge/task/MergeMultiChunkTask.java | 4 +
.../db/engine/storagegroup/TsFileResource.java | 10 +
.../storagegroup/timeindex/DeviceTimeIndex.java | 12 +
.../storagegroup/timeindex/FileTimeIndex.java | 12 +
.../engine/storagegroup/timeindex/ITimeIndex.java | 16 +
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 31 ++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 391 +++++++++++----------
.../iotdb/db/engine/merge/MergeTaskTest.java | 55 +++
.../apache/iotdb/db/metadata/mnode/MNodeTest.java | 27 ++
14 files changed, 432 insertions(+), 194 deletions(-)
diff --cc server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 93f2af2,8332a88..c99f5dc
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@@ -723,6 -582,188 +581,181 @@@ public class PhysicalGenerator
return queryPlan;
}
+ @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning
+ private QueryPlan getAlignQueryPlan(
+ QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan)
+ throws QueryProcessException {
+ // below is the core realization of ALIGN_BY_DEVICE sql logic
+ AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
+ if (queryPlan instanceof GroupByTimePlan) {
+ alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan);
+ } else if (queryPlan instanceof FillQueryPlan) {
+ alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
+ } else if (queryPlan instanceof AggregationPlan) {
+ if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+ throw new QueryProcessException("group by level does not support align by device now.");
+ }
+ alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
+ }
+
+ List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths();
+ // remove stars in fromPaths and get deviceId with deduplication
+ List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths);
+ List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths();
+ List<String> originAggregations = queryOperator.getSelectOperator().getAggregations();
+
+ // to record result measurement columns
+ List<String> measurements = new ArrayList<>();
+ Map<String, String> measurementAliasMap = new HashMap<>();
+ // to check the same measurement of different devices having the same datatype
+ // record the data type of each column of result set
+ Map<String, TSDataType> columnDataTypeMap = new HashMap<>();
+ Map<String, MeasurementType> measurementTypeMap = new HashMap<>();
+
+ // to record the real type of the corresponding measurement
+ Map<String, TSDataType> measurementDataTypeMap = new HashMap<>();
+ List<PartialPath> paths = new ArrayList<>();
+
+ for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT
+ PartialPath suffixPath = suffixPaths.get(i);
+
+ // to record measurements in the loop of a suffix path
+ Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+ // if const measurement
+ if (suffixPath.getMeasurement().startsWith("'")) {
+ measurements.add(suffixPath.getMeasurement());
+ measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant);
+ continue;
+ }
+
+ for (PartialPath device : devices) { // per device in FROM after deduplication
+
+ PartialPath fullPath = device.concatPath(suffixPath);
+ try {
+ // remove stars in SELECT to get actual paths
+ List<PartialPath> actualPaths = getMatchedTimeseries(fullPath);
+ if (suffixPath.isTsAliasExists()) {
+ if (actualPaths.size() == 1) {
+ String columnName = actualPaths.get(0).getMeasurement();
+ if (originAggregations != null && !originAggregations.isEmpty()) {
+ measurementAliasMap.put(
+ originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias());
+ } else {
+ measurementAliasMap.put(columnName, suffixPath.getTsAlias());
+ }
+ } else if (actualPaths.size() >= 2) {
+ throw new QueryProcessException(
+ "alias '"
+ + suffixPath.getTsAlias()
+ + "' can only be matched with one time series");
+ }
+ }
+
+ // for actual non exist path
+ if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) {
+ String nonExistMeasurement = fullPath.getMeasurement();
+ if (measurementSetOfGivenSuffix.add(nonExistMeasurement)
+ && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) {
+ measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist);
+ }
+ }
+
+ // Get data types with and without aggregate functions (actual time series) respectively
+ // Data type with aggregation function `columnDataTypes` is used for:
+ // 1. Data type consistency check 2. Header calculation, output result set
+ // The actual data type of the time series `measurementDataTypes` is used for
+ // the actual query in the AlignByDeviceDataSet
+ String aggregation =
+ originAggregations != null && !originAggregations.isEmpty()
+ ? originAggregations.get(i)
+ : null;
+
+ Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation);
+ List<TSDataType> columnDataTypes = pair.left;
+ List<TSDataType> measurementDataTypes = pair.right;
+ for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) {
+ PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes());
+
+ // check datatype consistency
+ // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by
+ // device,
+ // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+ String measurementChecked;
+ if (originAggregations != null && !originAggregations.isEmpty()) {
+ measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")";
+ } else {
+ measurementChecked = path.getMeasurement();
+ }
+ TSDataType columnDataType = columnDataTypes.get(pathIdx);
+ if (columnDataTypeMap.containsKey(measurementChecked)) {
+ if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) {
+ throw new QueryProcessException(
+ "The data types of the same measurement column should be the same across "
+ + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the "
+ + "SQL document.");
+ }
+ } else {
+ columnDataTypeMap.put(measurementChecked, columnDataType);
+ measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx));
+ }
+
+ // This step indicates that the measurement exists under the device and is correct,
+ // First, update measurementSetOfGivenSuffix which is distinct
+ // Then if this measurement is recognized as NonExist before,update it to Exist
+ if (measurementSetOfGivenSuffix.add(measurementChecked)
+ || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) {
+ measurementTypeMap.put(measurementChecked, MeasurementType.Exist);
+ }
+
+ // update paths
+ paths.add(path);
+ }
+
+ } catch (MetadataException e) {
+ throw new LogicalOptimizeException(
+ String.format(
+ "Error when getting all paths of a full path: %s", fullPath.getFullPath())
+ + e.getMessage());
+ }
+ }
+
+ // update measurements
+ // Note that in the loop of a suffix path, set is used.
+ // And across the loops of suffix paths, list is used.
+ // e.g. select *,s1 from root.sg.d0, root.sg.d1
+ // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+ // for suffix s1, measurementSetOfGivenSuffix = {s1}
+ // therefore the final measurements is [s1,s2,s3,s1].
+ measurements.addAll(measurementSetOfGivenSuffix);
+ }
+
+ // slimit trim on the measurementColumnList
+ if (queryOperator.hasSlimit()) {
+ int seriesSlimit = queryOperator.getSeriesLimit();
+ int seriesOffset = queryOperator.getSeriesOffset();
+ measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset);
+ }
+
- int maxDeduplicatedPathNum =
- QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize);
-
- if (measurements.size() > maxDeduplicatedPathNum) {
- throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size());
- }
-
+ // assigns to alignByDevicePlan
+ alignByDevicePlan.setMeasurements(measurements);
+ alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap);
+ alignByDevicePlan.setDevices(devices);
+ alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap);
+ alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap);
+ alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap);
+ alignByDevicePlan.setPaths(paths);
+
+ // get deviceToFilterMap
+ FilterOperator filterOperator = queryOperator.getFilterOperator();
+ if (filterOperator != null) {
+ alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator));
+ }
+
+ queryPlan = alignByDevicePlan;
+ return queryPlan;
+ }
+
// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]