You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/02/23 08:38:28 UTC

[iotdb] branch master updated: [IOTDB-1024] Support multiple aggregated measurements for group by level statement (#2714)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aed578  [IOTDB-1024] Support multiple aggregated measurements for group by level statement (#2714)
3aed578 is described below

commit 3aed5786f64bc2f5fc2a4b206747632fc1f4f906
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Feb 23 16:37:59 2021 +0800

    [IOTDB-1024] Support multiple aggregated measurements for group by level statement (#2714)
---
 .../DML Data Manipulation Language.md              |  38 +++--
 .../DML Data Manipulation Language.md              |  36 +++--
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |  35 +++++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  11 ++
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  11 --
 .../db/query/aggregation/impl/AvgAggrResult.java   |  17 +++
 .../query/dataset/groupby/GroupByTimeDataSet.java  |   6 +-
 .../db/query/executor/AggregationExecutor.java     |  12 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   8 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   | 155 ++++++++++-----------
 .../aggregation/IoTDBAggregationByLevelIT.java     |  86 ++++++++----
 11 files changed, 257 insertions(+), 158 deletions(-)

diff --git a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md
index dc25eea..457b693 100644
--- a/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md	
+++ b/docs/UserGuide/Operation Manual/DML Data Manipulation Language.md	
@@ -419,21 +419,41 @@ It costs 0.003s
 ```
 
 
-Assuming another timeseries is added, called "root.ln.wf02.wt01.status".
-To query the number of "status" points of both two paths "root.ln.wf01" and "root.ln.wf02".
+Suppose we add another two timeseries, "root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature".
+To query the count and the sum of "temperature" under path "root.ln.*.*", 
+aggregating on level=2, use following statement:
+
 ```
-select count(status) from root.ln.*.* group by level=2
+select count(temperature), sum(temperature) from root.ln.*.* group by level=2
 ```
 Result:
 
 ```
-+----------------------------+----------------------------+
-|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
-+----------------------------+----------------------------+
-|                       10080|                       10082|
-+----------------------------+----------------------------+
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|                                8|                                4|                          228.0|              91.83000183105469|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
 Total line number = 1
-It costs 0.003s
+It costs 0.013s
+```
+
+To query the count and the sum of path "root.ln.\*.\*.temperature" aggregating on "root.ln" level,
+simply set level=1
+
+```
+select count(temperature), sum(temperature) from root.ln.*.* group by level=1
+```
+Result:
+
+```
++------------------------------+----------------------------+
+|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
++------------------------------+----------------------------+
+|                            12|           319.8300018310547|
++------------------------------+----------------------------+
+Total line number = 1
+It costs 0.013s
 ```
 
 All supported aggregation functions are: count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value.
diff --git a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md
index db951ff..42b0e1d 100644
--- a/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md	
+++ b/docs/zh/UserGuide/Operation Manual/DML Data Manipulation Language.md	
@@ -469,7 +469,7 @@ select count(status) from root.ln.wf01.* group by level=2
 
 ```
 +----------------------------+
-|COUNT(root.ln.wf01.*.status)|
+|count(root.ln.wf01.*.status)|
 +----------------------------+
 |                       10080|
 +----------------------------+
@@ -478,21 +478,37 @@ It costs 0.003s
 ```
 
 
-假设此时在"root.ln"下面加入名为wf02的子序列,如"root.ln.wf02.wt01.status"。
-需要同时查询"root.ln.wf01"和"root.ln.wf02"下各自status子序列的点个数。则使用查询:
+假设此时添加两条序列,"root.ln.wf01.wt01.temperature" and "root.ln.wf02.wt01.temperature"。
+需要同时查询"root.ln.\*.\*.temperature"在第二层级的count聚合结果和sum聚合结果,可以使用下列查询语句:
 ```
-select count(status) from root.ln.*.* group by level=2
+select count(temperature), sum(temperature) from root.ln.*.* group by level=2
 ```
 运行结果:
 
 ```
-+----------------------------+----------------------------+
-|COUNT(root.ln.wf01.*.status)|COUNT(root.ln.wf02.*.status)|
-+----------------------------+----------------------------+
-|                       10080|                       10082|
-+----------------------------+----------------------------+
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|count(root.ln.wf02.*.temperature)|count(root.ln.wf01.*.temperature)|sum(root.ln.wf02.*.temperature)|sum(root.ln.wf01.*.temperature)|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
+|                                8|                                4|                          228.0|              91.83000183105469|
++---------------------------------+---------------------------------+-------------------------------+-------------------------------+
 Total line number = 1
-It costs 0.003s
+It costs 0.013s
+```
+
+若统计"root.ln.\*.\*"下第一层级的count聚合结果和sum聚合结果,则设置level=1即可:
+```
+select count(temperature), sum(temperature) from root.ln.*.* group by level=1
+```
+运行结果:
+
+```
++------------------------------+----------------------------+
+|count(root.ln.*.*.temperature)|sum(root.ln.*.*.temperature)|
++------------------------------+----------------------------+
+|                            12|           319.8300018310547|
++------------------------------+----------------------------+
+Total line number = 1
+It costs 0.013s
 ```
 
 分层聚合查询也可被用于其他聚合函数,当前所支持的聚合函数为:count, sum, avg, last_value, first_value, min_time, max_time, min_value, max_value
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index 9094c43..95e12da 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -18,10 +18,19 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class AggregationPlan extends RawDataQueryPlan {
 
@@ -33,6 +42,8 @@ public class AggregationPlan extends RawDataQueryPlan {
   private List<String> deduplicatedAggregations = new ArrayList<>();
 
   private int level = -1;
+  // group by level aggregation result path
+  private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>();
 
   public AggregationPlan() {
     super();
@@ -67,4 +78,28 @@ public class AggregationPlan extends RawDataQueryPlan {
   public void setLevel(int level) {
     this.level = level;
   }
+
+  public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException {
+    if (!levelAggPaths.isEmpty()) {
+      return levelAggPaths;
+    }
+    List<PartialPath> seriesPaths = getPaths();
+    List<TSDataType> dataTypes = getDataTypes();
+    try {
+      for (int i = 0; i < seriesPaths.size(); i++) {
+        String transformedPath =
+            FilePathUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel());
+        String key = getAggregations().get(i) + "(" + transformedPath + ")";
+        if (!levelAggPaths.containsKey(key)) {
+          AggregateResult aggRet =
+              AggregateResultFactory.getAggrResultByName(
+                  getAggregations().get(i), dataTypes.get(i));
+          levelAggPaths.put(key, aggRet);
+        }
+      }
+    } catch (IllegalPathException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+    return levelAggPaths;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 59ccb66..a454661 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -114,6 +114,7 @@ import org.apache.iotdb.db.qp.physical.sys.TracingPlan;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.udf.core.context.UDFContext;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -876,6 +877,16 @@ public class PhysicalGenerator {
                   .getContext()
                   .getColumnName()
               : columnForReader;
+      if (queryPlan instanceof AggregationPlan && ((AggregationPlan) queryPlan).getLevel() >= 0) {
+        String aggregatePath =
+            originalPath.isMeasurementAliasExists()
+                ? FilePathUtils.generatePartialPathByLevel(
+                    originalPath.getFullPathWithAlias(), ((AggregationPlan) queryPlan).getLevel())
+                : FilePathUtils.generatePartialPathByLevel(
+                    originalPath.toString(), ((AggregationPlan) queryPlan).getLevel());
+        columnForDisplay =
+            queryPlan.getAggregations().get(originalIndex) + "(" + aggregatePath + ")";
+      }
       if (!columnForDisplaySet.contains(columnForDisplay)) {
         queryPlan.addPathToIndex(columnForDisplay, queryPlan.getPathToIndex().size());
         if (queryPlan instanceof UDTFPlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index f74d82e..dea829f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -88,9 +88,6 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     }
 
     checkAggrOfSelectOperator(select);
-    if (((QueryOperator) operator).isGroupByLevel()) {
-      checkAggrOfGroupByLevel(select);
-    }
 
     boolean isAlignByDevice = false;
     if (operator instanceof QueryOperator) {
@@ -165,14 +162,6 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     }
   }
 
-  private void checkAggrOfGroupByLevel(SelectOperator selectOperator)
-      throws LogicalOptimizeException {
-    if (selectOperator.getAggregations().size() != 1) {
-      throw new LogicalOptimizeException(
-          "Aggregation function is restricted to one if group by level clause exists");
-    }
-  }
-
   private void extendListSafely(List<String> source, int index, List<String> target) {
     if (source != null && !source.isEmpty()) {
       target.add(source.get(index));
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index fb30a97..237fa08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -135,6 +135,23 @@ public class AvgAggrResult extends AggregateResult {
     cnt++;
   }
 
+  public void setAvgResult(TSDataType type, Object val) throws UnSupportedDataTypeException {
+    cnt = 1;
+    switch (type) {
+      case INT32:
+      case INT64:
+      case FLOAT:
+      case DOUBLE:
+        avg = (double) val;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", type));
+    }
+  }
+
   @Override
   public boolean hasFinalResult() {
     return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 46287f0..0a75fc1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -61,8 +60,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
       logger.debug("paths " + this.paths + " level:" + plan.getLevel());
     }
 
-    Map<Integer, String> pathIndex = new HashMap<>();
-    Map<String, AggregateResult> finalPaths = FilePathUtils.getPathByLevel(plan, pathIndex);
+    Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
 
     // get all records from GroupByDataSet, then we merge every record
     if (logger.isDebugEnabled()) {
@@ -72,7 +70,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
       RowRecord rawRecord = dataSet.nextWithoutConstraint();
       RowRecord curRecord = new RowRecord(rawRecord.getTimestamp());
       List<AggregateResult> mergedAggResults =
-          FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths, pathIndex);
+          FilePathUtils.mergeRecordByPath(plan, rawRecord, finalPaths);
       for (AggregateResult resultData : mergedAggResults) {
         TSDataType dataType = resultData.getResultDataType();
         curRecord.addField(resultData.getResult(), dataType);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0fd00fa..4c54ac9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -334,7 +334,7 @@ public class AggregationExecutor {
    *
    * @param context query context.
    */
-  public QueryDataSet executeWithValueFilter(QueryContext context, RawDataQueryPlan queryPlan)
+  public QueryDataSet executeWithValueFilter(QueryContext context, AggregationPlan queryPlan)
       throws StorageEngineException, IOException, QueryProcessException {
     int index = 0;
     for (; index < aggregations.size(); index++) {
@@ -426,7 +426,7 @@ public class AggregationExecutor {
    * @param aggregateResultList aggregate result list
    */
   private QueryDataSet constructDataSet(
-      List<AggregateResult> aggregateResultList, RawDataQueryPlan plan)
+      List<AggregateResult> aggregateResultList, AggregationPlan plan)
       throws QueryProcessException {
     RowRecord record = new RowRecord(0);
     for (AggregateResult resultData : aggregateResultList) {
@@ -435,13 +435,11 @@ public class AggregationExecutor {
     }
 
     SingleDataSet dataSet;
-    if (((AggregationPlan) plan).getLevel() >= 0) {
-      Map<Integer, String> pathIndex = new HashMap<>();
-      Map<String, AggregateResult> finalPaths =
-          FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
+    if (plan.getLevel() >= 0) {
+      Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
 
       List<AggregateResult> mergedAggResults =
-          FilePathUtils.mergeRecordByPath(aggregateResultList, finalPaths, pathIndex);
+          FilePathUtils.mergeRecordByPath(plan, aggregateResultList, finalPaths);
 
       List<PartialPath> paths = new ArrayList<>();
       List<TSDataType> dataTypes = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 056109c..55b5a1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -81,7 +81,6 @@ import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -139,7 +138,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -767,11 +765,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
       // same.
       return StaticResps.LAST_RESP.deepCopy();
     } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
-      Map<Integer, String> pathIndex = new HashMap<>();
-      Map<String, AggregateResult> finalPaths =
-          FilePathUtils.getPathByLevel((AggregationPlan) plan, pathIndex);
+      Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel();
       for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
-        respColumns.add(entry.getValue().getAggregationType() + "(" + entry.getKey() + ")");
+        respColumns.add(entry.getKey());
         columnsTypes.add(entry.getValue().getResultDataType().toString());
       }
     } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 36548d6..9555e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.metadata.MetaUtils;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.aggregation.impl.AvgAggrResult;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -38,7 +40,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 
@@ -124,53 +125,31 @@ public class FilePathUtils {
   }
 
   /**
-   * get paths from group by level, like root.sg1.d2.s0, root.sg1.d1.s0 level=1, return
-   * [root.sg1.*.s0, 0] and pathIndex turns to be [[0, root.sg1.*.s0], [1, root.sg1.*.s0]]
+   * Transform an originalPath to a partial path that satisfies given level. Path nodes exceed the
+   * given level will be replaced by "*", e.g. generatePartialPathByLevel("root.sg.dh.d1.s1", 2)
+   * will return "root.sg.dh.*.s1"
    *
-   * @param plan the original Aggregation Plan
-   * @param pathIndex the mapping from index of aggregations to the result path name
-   * @return
+   * @param originalPath the original timeseries path
+   * @param pathLevel the expected path level
+   * @return result partial path
    */
-  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
-  public static Map<String, AggregateResult> getPathByLevel(
-      AggregationPlan plan, Map<Integer, String> pathIndex) throws QueryProcessException {
-    // pathGroupByLevel -> count
-    Map<String, AggregateResult> finalPaths = new TreeMap<>();
-
-    List<PartialPath> seriesPaths = plan.getDeduplicatedPaths();
-    List<TSDataType> dataTypes = plan.getDeduplicatedDataTypes();
-    for (int i = 0; i < seriesPaths.size(); i++) {
-      String[] tmpPath;
-      try {
-        tmpPath = MetaUtils.splitPathToDetachedPath(seriesPaths.get(i).getFullPath());
-      } catch (IllegalPathException e) {
-        throw new QueryProcessException(e.getMessage());
-      }
-
-      String key;
-      if (tmpPath.length <= plan.getLevel()) {
-        key = seriesPaths.get(i).getFullPath();
+  public static String generatePartialPathByLevel(String originalPath, int pathLevel)
+      throws IllegalPathException {
+    String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath);
+    if (tmpPath.length <= pathLevel) {
+      return originalPath;
+    }
+    StringBuilder transformedPath = new StringBuilder();
+    transformedPath.append(tmpPath[0]);
+    for (int k = 1; k < tmpPath.length - 1; k++) {
+      if (k <= pathLevel) {
+        transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
       } else {
-        StringBuilder path = new StringBuilder();
-        path.append(tmpPath[0]);
-        for (int k = 1; k < tmpPath.length - 1; k++) {
-          if (k <= plan.getLevel()) {
-            path.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
-          } else {
-            path.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
-          }
-        }
-        path.append(TsFileConstant.PATH_SEPARATOR).append(seriesPaths.get(i).getMeasurement());
-        key = path.toString();
+        transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
       }
-      AggregateResult aggRet =
-          AggregateResultFactory.getAggrResultByName(
-              plan.getAggregations().get(i), dataTypes.get(i));
-      finalPaths.putIfAbsent(key, aggRet);
-      pathIndex.put(i, key);
     }
-
-    return finalPaths;
+    transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length - 1]);
+    return transformedPath.toString();
   }
 
   /**
@@ -179,14 +158,11 @@ public class FilePathUtils {
    *
    * @param newRecord
    * @param finalPaths
-   * @param pathIndex
    * @return
    */
   public static List<AggregateResult> mergeRecordByPath(
-      AggregationPlan plan,
-      RowRecord newRecord,
-      Map<String, AggregateResult> finalPaths,
-      Map<Integer, String> pathIndex) {
+      AggregationPlan plan, RowRecord newRecord, Map<String, AggregateResult> finalPaths)
+      throws QueryProcessException {
     if (newRecord.getFields().size() < finalPaths.size()) {
       return Collections.emptyList();
     }
@@ -195,43 +171,50 @@ public class FilePathUtils {
       if (newRecord.getFields().get(i) == null) {
         aggregateResultList.add(
             AggregateResultFactory.getAggrResultByName(
-                plan.getAggregations().get(i), plan.getDeduplicatedDataTypes().get(i)));
+                plan.getDeduplicatedAggregations().get(i), plan.getDeduplicatedDataTypes().get(i)));
       } else {
         TSDataType dataType = newRecord.getFields().get(i).getDataType();
         AggregateResult aggRet =
-            AggregateResultFactory.getAggrResultByName(plan.getAggregations().get(i), dataType);
-        switch (dataType) {
-          case TEXT:
-            aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV());
-            break;
-          case INT32:
-            aggRet.setIntValue(newRecord.getFields().get(i).getIntV());
-            break;
-          case INT64:
-            aggRet.setLongValue(newRecord.getFields().get(i).getLongV());
-            break;
-          case FLOAT:
-            aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV());
-            break;
-          case DOUBLE:
-            aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV());
-            break;
-          case BOOLEAN:
-            aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV());
-            break;
-          default:
-            throw new UnSupportedDataTypeException(dataType.toString());
+            AggregateResultFactory.getAggrResultByName(
+                plan.getDeduplicatedAggregations().get(i), dataType);
+        if (aggRet.getAggregationType().equals(AggregationType.AVG)) {
+          ((AvgAggrResult) aggRet)
+              .setAvgResult(dataType, newRecord.getFields().get(i).getDoubleV());
+        } else {
+          switch (dataType) {
+            case TEXT:
+              aggRet.setBinaryValue(newRecord.getFields().get(i).getBinaryV());
+              break;
+            case INT32:
+              aggRet.setIntValue(newRecord.getFields().get(i).getIntV());
+              break;
+            case INT64:
+              aggRet.setLongValue(newRecord.getFields().get(i).getLongV());
+              break;
+            case FLOAT:
+              aggRet.setFloatValue(newRecord.getFields().get(i).getFloatV());
+              break;
+            case DOUBLE:
+              aggRet.setDoubleValue(newRecord.getFields().get(i).getDoubleV());
+              break;
+            case BOOLEAN:
+              aggRet.setBooleanValue(newRecord.getFields().get(i).getBoolV());
+              break;
+            default:
+              throw new UnSupportedDataTypeException(dataType.toString());
+          }
         }
         aggregateResultList.add(aggRet);
       }
     }
-    return mergeRecordByPath(aggregateResultList, finalPaths, pathIndex);
+    return mergeRecordByPath(plan, aggregateResultList, finalPaths);
   }
 
   public static List<AggregateResult> mergeRecordByPath(
+      AggregationPlan plan,
       List<AggregateResult> aggResults,
-      Map<String, AggregateResult> finalPaths,
-      Map<Integer, String> pathIndex) {
+      Map<String, AggregateResult> finalPaths)
+      throws QueryProcessException {
     if (aggResults.size() < finalPaths.size()) {
       return Collections.emptyList();
     }
@@ -240,16 +223,24 @@ public class FilePathUtils {
     }
 
     List<AggregateResult> resultSet = new ArrayList<>();
-    for (int i = 0; i < aggResults.size(); i++) {
-      if (aggResults.get(i) != null) {
-        AggregateResult tempAggResult = finalPaths.get(pathIndex.get(i));
-        if (tempAggResult == null) {
-          finalPaths.put(pathIndex.get(i), aggResults.get(i));
-        } else {
-          tempAggResult.merge(aggResults.get(i));
-          finalPaths.put(pathIndex.get(i), tempAggResult);
+    List<PartialPath> dupPaths = plan.getDeduplicatedPaths();
+    try {
+      for (int i = 0; i < aggResults.size(); i++) {
+        if (aggResults.get(i) != null) {
+          String transformedPath =
+              generatePartialPathByLevel(dupPaths.get(i).getFullPath(), plan.getLevel());
+          String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath + ")";
+          AggregateResult tempAggResult = finalPaths.get(key);
+          if (tempAggResult == null) {
+            finalPaths.put(key, aggResults.get(i));
+          } else {
+            tempAggResult.merge(aggResults.get(i));
+            finalPaths.put(key, tempAggResult);
+          }
         }
       }
+    } catch (IllegalPathException e) {
+      throw new QueryProcessException(e.getMessage());
     }
 
     for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
index 41290d5..c1a7e14 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/aggregation/IoTDBAggregationByLevelIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.integration.aggregation;
 
+import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.qp.Planner;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
@@ -87,7 +88,7 @@ public class IoTDBAggregationByLevelIT {
       int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.sum("root.sg1.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -96,7 +97,7 @@ public class IoTDBAggregationByLevelIT {
       statement.execute("select sum(temperature) from root.sg2.* GROUP BY level=1");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.sum("root.sg2.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -105,7 +106,7 @@ public class IoTDBAggregationByLevelIT {
       statement.execute("select sum(temperature) from root.*.* GROUP BY level=0");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.sum("root.*.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -128,7 +129,7 @@ public class IoTDBAggregationByLevelIT {
       int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.avg("root.sg1.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -137,7 +138,7 @@ public class IoTDBAggregationByLevelIT {
       statement.execute("select avg(temperature) from root.sg2.* GROUP BY level=1");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.avg("root.sg2.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -146,7 +147,7 @@ public class IoTDBAggregationByLevelIT {
       statement.execute("select avg(temperature) from root.*.* GROUP BY level=0");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans = resultSet.getString(TestConstant.avg("root.*.*.temperature"));
           Assert.assertEquals(retArray[cnt], Double.parseDouble(ans), DOUBLE_PRECISION);
           cnt++;
         }
@@ -159,26 +160,38 @@ public class IoTDBAggregationByLevelIT {
   public void timeFuncGroupByLevelTest() throws Exception {
     String[] retArray =
         new String[] {
-          "100", "600,700",
+          "8,100", "600,700,2,3",
         };
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
-      statement.execute("select min_time(temperature) from root.*.* GROUP BY level=0");
+      statement.execute(
+          "select count(status), min_time(temperature) from root.*.* GROUP BY level=0");
 
       int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans =
+              resultSet.getString(TestConstant.count("root.*.*.status"))
+                  + ","
+                  + resultSet.getString(TestConstant.min_time("root.*.*.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
       }
 
-      statement.execute("select max_time(status) from root.sg1.* GROUP BY level=2");
+      statement.execute(
+          "select max_time(status), count(temperature) from root.sg1.* GROUP BY level=2");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1) + "," + resultSet.getString(2);
+          String ans =
+              resultSet.getString(TestConstant.max_time("root.sg1.d1.status"))
+                  + ","
+                  + resultSet.getString(TestConstant.max_time("root.sg1.d2.status"))
+                  + ","
+                  + resultSet.getString(TestConstant.count("root.sg1.d1.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.count("root.sg1.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
@@ -191,26 +204,38 @@ public class IoTDBAggregationByLevelIT {
   public void valueFuncGroupByLevelTest() throws Exception {
     String[] retArray =
         new String[] {
-          "61.22", "71.12,62.15",
+          "61.22,125.5", "71.12,62.15,71.12,62.15",
         };
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
-      statement.execute("select last_value(temperature) from root.*.* GROUP BY level=0");
+      statement.execute(
+          "select last_value(temperature), max_value(temperature) from root.*.* GROUP BY level=0");
 
       int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1);
+          String ans =
+              resultSet.getString(TestConstant.last_value("root.*.*.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.max_value("root.*.*.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
       }
 
-      statement.execute("select max_value(temperature) from root.sg1.* GROUP BY level=2");
+      statement.execute(
+          "select last_value(temperature), max_value(temperature) from root.sg1.* GROUP BY level=2");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(1) + "," + resultSet.getString(2);
+          String ans =
+              resultSet.getString(TestConstant.last_value("root.sg1.d1.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.last_value("root.sg1.d2.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.max_value("root.sg1.d1.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.max_value("root.sg1.d2.temperature"));
           Assert.assertEquals(retArray[cnt], ans);
           cnt++;
         }
@@ -227,7 +252,12 @@ public class IoTDBAggregationByLevelIT {
         };
     String[] retArray2 =
         new String[] {
-          "null,null", "null,100", "200,200", "300,null", "null,null", "null,500",
+          "null,null,null,null",
+          "null,100,null,88.24",
+          "200,200,31.685,105.5",
+          "300,null,46.77,null",
+          "null,null,null,null",
+          "null,500,null,125.5",
         };
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -238,7 +268,7 @@ public class IoTDBAggregationByLevelIT {
       int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(2);
+          String ans = resultSet.getString(TestConstant.sum("root.sg2.*.temperature"));
           Assert.assertEquals(retArray1[cnt], ans);
           cnt++;
         }
@@ -246,10 +276,17 @@ public class IoTDBAggregationByLevelIT {
 
       cnt = 0;
       statement.execute(
-          "select max_time(temperature) from root.*.* GROUP BY ([0, 600), 100ms), level=1");
+          "select max_time(temperature), avg(temperature) from root.*.* GROUP BY ([0, 600), 100ms), level=1");
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
-          String ans = resultSet.getString(2) + "," + resultSet.getString(3);
+          String ans =
+              resultSet.getString(TestConstant.max_time("root.sg1.*.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.max_time("root.sg2.*.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.avg("root.sg1.*.temperature"))
+                  + ","
+                  + resultSet.getString(TestConstant.avg("root.sg2.*.temperature"));
           Assert.assertEquals(retArray2[cnt], ans);
           cnt++;
         }
@@ -282,15 +319,6 @@ public class IoTDBAggregationByLevelIT {
       } catch (Exception e) {
         Assert.assertEquals("Aggregate among unmatched data types", e.getMessage());
       }
-
-      try {
-        planner.parseSQLToPhysicalPlan(
-            "select avg(status), sum(temperature) from root.sg2.* GROUP BY level=1");
-      } catch (Exception e) {
-        Assert.assertEquals(
-            "Aggregation function is restricted to one if group by level clause exists",
-            e.getMessage());
-      }
     }
   }