You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/02/23 08:38:28 UTC
[iotdb] branch master updated: [IOTDB-1024] Support multiple
aggregated measurements for group by level statement (#2714)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3aed578 [IOTDB-1024] Support multiple aggregated measurements for group by level statement (#2714)
3aed578 is described below
commit 3aed5786f64bc2f5fc2a4b206747632fc1f4f906
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Feb 23 16:37:59 2021 +0800
[IOTDB-1024] Support multiple aggregated measurements for group by level statement (#2714)
---
.../DML Data Manipulation Language.md | 38 +++--
.../DML Data Manipulation Language.md | 36 +++--
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 35 +++++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 11 ++
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 11 --
.../db/query/aggregation/impl/AvgAggrResult.java | 17 +++
.../query/dataset/groupby/GroupByTimeDataSet.java | 6 +-
.../db/query/executor/AggregationExecutor.java | 12 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 8 +-
.../org/apache/iotdb/db/utils/FilePathUtils.java | 155 ++++++++++-----------
.../aggregation/IoTDBAggregationByLevelIT.java | 86 ++++++++----
11 files changed, 257 insertions(+), 158 deletions(-)
diff --git a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
index dc25eea..457b693 100644
--- a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
+++ b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
@@ -419,21 +419,41 @@ It costs 0.003s
```
-Assuming another timeseries is added, called "root.ln.wf02.wt01.status".
-To query the number of "status" points of both two paths "root.ln.wf01" and "root.ln.wf02".
+Suppose we add another two timeseries, "root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature".
+To query the count and the sum of "temperature" under path "root.ln.*.*",
+aggregating on level=2, use following statement:
+
```
-select count(status) from root.ln.*.* group by level=2
+select count(temperature), sum(temperature) from root.ln.*.* group by level=2
```
Result:
```
-+----------------------------+----------------------------+
-|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
-+----------------------------+----------------------------+
-| 10080| 10082|
-+----------------------------+----------------------------+
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+| 8| 4| 228.0| 91.83000183105469|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
Total line number = 1
-It costs 0.003s
+It costs 0.013s
+```
+
+To query the count and the sum of path "root.ln.\*.\*.temperature" aggregating on "root.ln" level,
+simply set level=1
+
+```
+select count(temperature), sum(temperature) from root.ln.*.* group by level=1
+```
+Result:
+
+```
++------------------------------+----------------------------+
+|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
++------------------------------+----------------------------+
+| 12| 319.8300018310547|
++------------------------------+----------------------------+
+Total line number = 1
+It costs 0.013s
```
All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value.
diff --git a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
index db951ff..42b0e1d 100644
--- a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
+++ b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
@@ -469,7 +469,7 @@ select count(status) from root.ln.wf01.* group by level=2
```
+----------------------------+
-|COUNT(root.ln.wf01.*.status)|
+|count(root.ln.wf01.*.status)|
+----------------------------+
| 10080|
+----------------------------+
@@ -478,21 +478,37 @@ It costs 0.003s
```
-假设此时在"root.ln"下面加入名为wf02的子序列,如"root.ln.wf02.wt01.status"。
-需要同时查询"root.ln.wf01"和"root.ln.wf02"下各自status子序列的点个数。则使用查询:
+假设此时添加两条序列,"root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature"。
+需要同时查询"root.ln.\*.\*.temperature"在第二层级的count聚合结果和sum聚合结果,可以使用下列查询语句:
```
-select count(status) from root.ln.*.* group by level=2
+select count(temperature), sum(temperature) from root.ln.*.* group by level=2
```
运行结果:
```
-+----------------------------+----------------------------+
-|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
-+----------------------------+----------------------------+
-| 10080| 10082|
-+----------------------------+----------------------------+
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+| 8| 4| 228.0| 91.83000183105469|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
Total line number = 1
-It costs 0.003s
+It costs 0.013s
+```
+
+若统计"root.ln.\*.\*"下第一层级的count聚合结果和sum聚合结果,则设置level=1即可:
+```
+select count(temperature), sum(temperature) from root.ln.*.* group by level=1
+```
+运行结果:
+
+```
++------------------------------+----------------------------+
+|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
++------------------------------+----------------------------+
+| 12| 319.8300018310547|
++------------------------------+----------------------------+
+Total line number = 1
+It costs 0.013s
```
分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 9094c43..95e12da 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -18,10 +18,19 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class AggregationPlan extends RawDataQueryPlan {
@@ -33,6 +42,8 @@ public class AggregationPlan extends RawDataQueryPlan {
private List<String> deduplicatedAggregations = new ArrayList<>();
private int level = -1;
+ // group by level aggregation result path
+ private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>();
public AggregationPlan() {
super();
@@ -67,4 +78,28 @@ public class AggregationPlan extends RawDataQueryPlan {
public void setLevel(int level) {
this.level = level;
}
+
+ public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException {
+ if (!levelAggPaths.isEmpty()) {
+ return levelAggPaths;
+ }
+ List<PartialPath> seriesPaths = getPaths();
+ List<TSDataType> dataTypes = getDataTypes();
+ try {
+ for (int i = 0; i < seriesPaths.size(); i++) {
+ String transformedPath =
+ FilePathUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel());
+ String key = getAggregations().get(i) + "(" + transformedPath + ")";
+ if (!levelAggPaths.containsKey(key)) {
+ AggregateResult aggRet =
+ AggregateResultFactory.getAggrResultByName(
+ getAggregations().get(i), dataTypes.get(i));
+ levelAggPaths.put(key, aggRet);
+ }
+ }
+ } catch (IllegalPathException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ return levelAggPaths;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 59ccb66..a454661 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -114,6 +114,7 @@ import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -876,6 +877,16 @@ public class PhysicalGenerator {
.getContext()
.getColumnName()
: columnForReader;
+ if (queryPlan instanceof AggregationPlan && ((AggregationPlan) queryPlan).getLevel() >= 0) {
+ String aggregatePath =
+ originalPath.isMeasurementAliasExists()
+ ? FilePathUtils.generatePartialPathByLevel(
+ originalPath.getFullPathWithAlias(), ((AggregationPlan) queryPlan).getLevel())
+ : FilePathUtils.generatePartialPathByLevel(
+ originalPath.toString(), ((AggregationPlan) queryPlan).getLevel());
+ columnForDisplay =
+ queryPlan.getAggregations().get(originalIndex) + "(" + aggregatePath + ")";
+ }
if (!columnForDisplaySet.contains(columnForDisplay)) {
queryPlan.addPathToIndex(columnForDisplay, queryPlan.getPathToIndex().size());
if (queryPlan instanceof UDTFPlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index f74d82e..dea829f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -88,9 +88,6 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
}
checkAggrOfSelectOperator(select);
- if (((QueryOperator) operator).isGroupByLevel()) {
- checkAggrOfGroupByLevel(select);
- }
boolean isAlignByDevice = false;
if (operator instanceof QueryOperator) {
@@ -165,14 +162,6 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
}
}
- private void checkAggrOfGroupByLevel(SelectOperator selectOperator)
- throws LogicalOptimizeException {
- if (selectOperator.getAggregations().size() != 1) {
- throw new LogicalOptimizeException(
- "Aggregation function is restricted to one if group by level clause exists");
- }
- }
-
private void extendListSafely(List<String> source, int index, List<String> target) {
if (source != null && !source.isEmpty()) {
target.add(source.get(index));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index fb30a97..237fa08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -135,6 +135,23 @@ public class AvgAggrResult extends AggregateResult {
cnt++;
}
+ public void setAvgResult(TSDataType type, Object val) throws UnSupportedDataTypeException {
+ cnt = 1;
+ switch (type) {
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ avg = (double) val;
+ break;
+ case TEXT:
+ case BOOLEAN:
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Unsupported data type in aggregation AVG : %s", type));
+ }
+ }
+
@Override
public boolean hasFinalResult() {
return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 46287f0..0a75fc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,8 +60,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
logger.debug("paths " + this.paths + " level:" + plan.getLevel());
}
- Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, AggregateResult> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+ Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
// get all records from GroupByDataSet, then we merge every record
if (logger.isDebugEnabled()) {
@@ -72,7 +70,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
RowRecord rawRecord = dataSet.nextWithoutConstraint();
RowRecord curRecord = new RowRecord(rawRecord.getTimestamp());
List<AggregateResult> mergedAggResults =
- FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths, pathIndex);
+ FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths);
for (AggregateResult resultData : mergedAggResults) {
TSDataType dataType = resultData.getResultDataType();
curRecord.addField(resultData.getResult(), dataType);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0fd00fa..4c54ac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -334,7 +334,7 @@ public class AggregationExecutor {
*
* @param context query context.
*/
- public QueryDataSet executeWithValueFilter(QueryContext context, RawDataQueryPlan queryPlan)
+ public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
throws StorageEngineException, IOException, QueryProcessException {
int index = 0;
for (; index < aggregations.size(); index++) {
@@ -426,7 +426,7 @@ public class AggregationExecutor {
* @param aggregateResultList aggregate result list
*/
private QueryDataSet constructDataSet(
- List<AggregateResult> aggregateResultList, RawDataQueryPlan plan)
+ List<AggregateResult> aggregateResultList, AggregationPlan plan)
throws QueryProcessException {
RowRecord record = new RowRecord(0);
for (AggregateResult resultData : aggregateResultList) {
@@ -435,13 +435,11 @@ public class AggregationExecutor {
}
SingleDataSet dataSet;
- if (((AggregationPlan) plan).getLevel() >= 0) {
- Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, AggregateResult> finalPaths =
- FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
+ if (plan.getLevel() >= 0) {
+ Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
List<AggregateResult> mergedAggResults =
- FilePathUtils.mergeRecordByPath(aggregateResultList, finalPaths, pathIndex);
+ FilePathUtils.mergeRecordByPath(plan, aggregateResultList, finalPaths);
List<PartialPath> paths = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 056109c..55b5a1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,7 +81,6 @@ import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
import org.apache.iotdb.db.query.dataset.UDTFDataSet;
import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
-import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.rpc.RpcUtils;
@@ -139,7 +138,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -767,11 +765,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
// same.
return StaticResps.LAST_RESP.deepCopy();
} else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
- Map<Integer, String> pathIndex = new HashMap<>();
- Map<String, AggregateResult> finalPaths =
- FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
+ Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel();
for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
- respColumns.add(entry.getValue().getAggregationType() + "(" + entry.getKey() + ")");
+ respColumns.add(entry.getKey());
columnsTypes.add(entry.getValue().getResultDataType().toString());
}
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 36548d6..9555e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -38,7 +40,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
@@ -124,53 +125,31 @@ public class FilePathUtils {
}
/**
- * get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s0 level=1, return
- * [root.sg1.*.s0, 0] and pathIndex turns to be [[0, root.sg1.*.s0], [1, root.sg1.*.s0]]
+ * Transform an originalPath to a partial path that satisfies given level. Path nodes exceed the
+ * given level will be replaced by "*", e.g. generatePartialPathByLevel("root.sg.dh.d1.s1", 2)
+ * will return "root.sg.dh.*.s1"
*
- * @param plan the original Aggregation Plan
- * @param pathIndex the mapping from index of aggregations to the result path name
- * @return
+ * @param originalPath the original timeseries path
+ * @param pathLevel the expected path level
+ * @return result partial path
*/
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public static Map<String, AggregateResult> getPathByLevel(
- AggregationPlan plan, Map<Integer, String> pathIndex) throws QueryProcessException {
- // pathGroupByLevel -> count
- Map<String, AggregateResult> finalPaths = new TreeMap<>();
-
- List<PartialPath> seriesPaths = plan.getDeduplicatedPaths();
- List<TSDataType> dataTypes = plan.getDeduplicatedDataTypes();
- for (int i = 0; i < seriesPaths.size(); i++) {
- String[] tmpPath;
- try {
- tmpPath = MetaUtils.splitPathToDetachedPath(seriesPaths.get(i).getFullPath());
- } catch (IllegalPathException e) {
- throw new QueryProcessException(e.getMessage());
- }
-
- String key;
- if (tmpPath.length <= plan.getLevel()) {
- key = seriesPaths.get(i).getFullPath();
+ public static String generatePartialPathByLevel(String originalPath, int pathLevel)
+ throws IllegalPathException {
+ String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath);
+ if (tmpPath.length <= pathLevel) {
+ return originalPath;
+ }
+ StringBuilder transformedPath = new StringBuilder();
+ transformedPath.append(tmpPath[0]);
+ for (int k = 1; k < tmpPath.length - 1; k++) {
+ if (k <= pathLevel) {
+ transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
} else {
- StringBuilder path = new StringBuilder();
- path.append(tmpPath[0]);
- for (int k = 1; k < tmpPath.length - 1; k++) {
- if (k <= plan.getLevel()) {
- path.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
- } else {
- path.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
- }
- }
- path.append(TsFileConstant.PATH_SEPARATOR).append(seriesPaths.get(i).getMeasurement());
- key = path.toString();
+ transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
}
- AggregateResult aggRet =
- AggregateResultFactory.getAggrResultByName(
- plan.getAggregations().get(i), dataTypes.get(i));
- finalPaths.putIfAbsent(key, aggRet);
- pathIndex.put(i, key);
}
-
- return finalPaths;
+ transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length - 1]);
+ return transformedPath.toString();
}
/**
@@ -179,14 +158,11 @@ public class FilePathUtils {
*
* @param newRecord
* @param finalPaths
- * @param pathIndex
* @return
*/
public static List<AggregateResult> mergeRecordByPath(
- AggregationPlan plan,
- RowRecord newRecord,
- Map<String, AggregateResult> finalPaths,
- Map<Integer, String> pathIndex) {
+ AggregationPlan plan, RowRecord newRecord, Map<String, AggregateResult> finalPaths)
+ throws QueryProcessException {
if (newRecord.getFields().size() < finalPaths.size()) {
return Collections.emptyList();
}
@@ -195,43 +171,50 @@ public class FilePathUtils {
if (newRecord.getFields().get(i) == null) {
aggregateResultList.add(
AggregateResultFactory.getAggrResultByName(
- plan.getAggregations().get(i), plan.getDeduplicatedDataTypes().get(i)));
+ plan.getDeduplicatedAggregations().get(i), plan.getDeduplicatedDataTypes().get(i)));
} else {
TSDataType dataType = newRecord.getFields().get(i).getDataType();
AggregateResult aggRet =
- AggregateResultFactory.getAggrResultByName(plan.getAggregations().get(i), dataType);
- switch (dataType) {
- case TEXT:
- aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV());
- break;
- case INT32:
- aggRet.setIntValue(newRecord.getFields().get(i).getIntV());
- break;
- case INT64:
- aggRet.setLongValue(newRecord.getFields().get(i).getLongV());
- break;
- case FLOAT:
- aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV());
- break;
- case DOUBLE:
- aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV());
- break;
- case BOOLEAN:
- aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV());
- break;
- default:
- throw new UnSupportedDataTypeException(dataType.toString());
+ AggregateResultFactory.getAggrResultByName(
+ plan.getDeduplicatedAggregations().get(i), dataType);
+ if (aggRet.getAggregationType().equals(AggregationType.AVG)) {
+ ((AvgAggrResult) aggRet)
+ .setAvgResult(dataType, newRecord.getFields().get(i).getDoubleV());
+ } else {
+ switch (dataType) {
+ case TEXT:
+ aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV());
+ break;
+ case INT32:
+ aggRet.setIntValue(newRecord.getFields().get(i).getIntV());
+ break;
+ case INT64:
+ aggRet.setLongValue(newRecord.getFields().get(i).getLongV());
+ break;
+ case FLOAT:
+ aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV());
+ break;
+ case DOUBLE:
+ aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV());
+ break;
+ case BOOLEAN:
+ aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV());
+ break;
+ default:
+ throw new UnSupportedDataTypeException(dataType.toString());
+ }
}
aggregateResultList.add(aggRet);
}
}
- return mergeRecordByPath(aggregateResultList, finalPaths, pathIndex);
+ return mergeRecordByPath(plan, aggregateResultList, finalPaths);
}
public static List<AggregateResult> mergeRecordByPath(
+ AggregationPlan plan,
List<AggregateResult> aggResults,
- Map<String, AggregateResult> finalPaths,
- Map<Integer, String> pathIndex) {
+ Map<String, AggregateResult> finalPaths)
+ throws QueryProcessException {
if (aggResults.size() < finalPaths.size()) {
return Collections.emptyList();
}
@@ -240,16 +223,24 @@ public class FilePathUtils {
}
List<AggregateResult> resultSet = new ArrayList<>();
- for (int i = 0; i < aggResults.size(); i++) {
- if (aggResults.get(i) != null) {
- AggregateResult tempAggResult = finalPaths.get(pathIndex.get(i));
- if (tempAggResult == null) {
- finalPaths.put(pathIndex.get(i), aggResults.get(i));
- } else {
- tempAggResult.merge(aggResults.get(i));
- finalPaths.put(pathIndex.get(i), tempAggResult);
+ List<PartialPath> dupPaths = plan.getDeduplicatedPaths();
+ try {
+ for (int i = 0; i < aggResults.size(); i++) {
+ if (aggResults.get(i) != null) {
+ String transformedPath =
+ generatePartialPathByLevel(dupPaths.get(i).getFullPath(), plan.getLevel());
+ String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath + ")";
+ AggregateResult tempAggResult = finalPaths.get(key);
+ if (tempAggResult == null) {
+ finalPaths.put(key, aggResults.get(i));
+ } else {
+ tempAggResult.merge(aggResults.get(i));
+ finalPaths.put(key, tempAggResult);
+ }
}
}
+ } catch (IllegalPathException e) {
+ throw new QueryProcessException(e.getMessage());
}
for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
index 41290d5..c1a7e14 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.integration.aggregation;
+import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -87,7 +88,7 @@ public class IoTDBAggregationByLevelIT {
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.sum("root.sg1.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -96,7 +97,7 @@ public class IoTDBAggregationByLevelIT {
statement.execute("select sum(temperature) from root.sg2.* GROUP BY level=1");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.sum("root.sg2.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -105,7 +106,7 @@ public class IoTDBAggregationByLevelIT {
statement.execute("select sum(temperature) from root.*.* GROUP BY level=0");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.sum("root.*.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -128,7 +129,7 @@ public class IoTDBAggregationByLevelIT {
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.avg("root.sg1.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -137,7 +138,7 @@ public class IoTDBAggregationByLevelIT {
statement.execute("select avg(temperature) from root.sg2.* GROUP BY level=1");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.avg("root.sg2.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -146,7 +147,7 @@ public class IoTDBAggregationByLevelIT {
statement.execute("select avg(temperature) from root.*.* GROUP BY level=0");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans = resultSet.getString(TestConstant.avg("root.*.*.temperature"));
Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
cnt++;
}
@@ -159,26 +160,38 @@ public class IoTDBAggregationByLevelIT {
public void timeFuncGroupByLevelTest() throws Exception {
String[] retArray =
new String[] {
- "100", "600,700",
+ "8,100", "600,700,2,3",
};
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- statement.execute("select min_time(temperature) from root.*.* GROUP BY level=0");
+ statement.execute(
+ "select count(status), min_time(temperature) from root.*.* GROUP BY level=0");
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans =
+ resultSet.getString(TestConstant.count("root.*.*.status"))
+ + ","
+ + resultSet.getString(TestConstant.min_time("root.*.*.temperature"));
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
- statement.execute("select max_time(status) from root.sg1.* GROUP BY level=2");
+ statement.execute(
+ "select max_time(status), count(temperature) from root.sg1.* GROUP BY level=2");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1) + "," + resultSet.getString(2);
+ String ans =
+ resultSet.getString(TestConstant.max_time("root.sg1.d1.status"))
+ + ","
+ + resultSet.getString(TestConstant.max_time("root.sg1.d2.status"))
+ + ","
+ + resultSet.getString(TestConstant.count("root.sg1.d1.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.count("root.sg1.d2.temperature"));
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -191,26 +204,38 @@ public class IoTDBAggregationByLevelIT {
public void valueFuncGroupByLevelTest() throws Exception {
String[] retArray =
new String[] {
- "61.22", "71.12,62.15",
+ "61.22,125.5", "71.12,62.15,71.12,62.15",
};
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
- statement.execute("select last_value(temperature) from root.*.* GROUP BY level=0");
+ statement.execute(
+ "select last_value(temperature), max_value(temperature) from root.*.* GROUP BY level=0");
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1);
+ String ans =
+ resultSet.getString(TestConstant.last_value("root.*.*.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.max_value("root.*.*.temperature"));
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
}
- statement.execute("select max_value(temperature) from root.sg1.* GROUP BY level=2");
+ statement.execute(
+ "select last_value(temperature), max_value(temperature) from root.sg1.* GROUP BY level=2");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(1) + "," + resultSet.getString(2);
+ String ans =
+ resultSet.getString(TestConstant.last_value("root.sg1.d1.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.last_value("root.sg1.d2.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.max_value("root.sg1.d1.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.max_value("root.sg1.d2.temperature"));
Assert.assertEquals(retArray[cnt], ans);
cnt++;
}
@@ -227,7 +252,12 @@ public class IoTDBAggregationByLevelIT {
};
String[] retArray2 =
new String[] {
- "null,null", "null,100", "200,200", "300,null", "null,null", "null,500",
+ "null,null,null,null",
+ "null,100,null,88.24",
+ "200,200,31.685,105.5",
+ "300,null,46.77,null",
+ "null,null,null,null",
+ "null,500,null,125.5",
};
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -238,7 +268,7 @@ public class IoTDBAggregationByLevelIT {
int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(2);
+ String ans = resultSet.getString(TestConstant.sum("root.sg2.*.temperature"));
Assert.assertEquals(retArray1[cnt], ans);
cnt++;
}
@@ -246,10 +276,17 @@ public class IoTDBAggregationByLevelIT {
cnt = 0;
statement.execute(
- "select max_time(temperature) from root.*.* GROUP BY ([0, 600), 100ms), level=1");
+ "select max_time(temperature), avg(temperature) from root.*.* GROUP BY ([0, 600), 100ms), level=1");
try (ResultSet resultSet = statement.getResultSet()) {
while (resultSet.next()) {
- String ans = resultSet.getString(2) + "," + resultSet.getString(3);
+ String ans =
+ resultSet.getString(TestConstant.max_time("root.sg1.*.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.max_time("root.sg2.*.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.avg("root.sg1.*.temperature"))
+ + ","
+ + resultSet.getString(TestConstant.avg("root.sg2.*.temperature"));
Assert.assertEquals(retArray2[cnt], ans);
cnt++;
}
@@ -282,15 +319,6 @@ public class IoTDBAggregationByLevelIT {
} catch (Exception e) {
Assert.assertEquals("Aggregate among unmatched data types", e.getMessage());
}
-
- try {
- planner.parseSQLToPhysicalPlan(
- "select avg(status), sum(temperature) from root.sg2.* GROUP BY level=1");
- } catch (Exception e) {
- Assert.assertEquals(
- "Aggregation function is restricted to one if group by level clause exists",
- e.getMessage());
- }
}
}