You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/11/29 03:08:41 UTC

[iotdb] 01/01: IOTDB-1823 group by multi level

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch group_by_multi_level_1129
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fdfb4ae642f0942a75f5549cf8716ac81175ea70
Author: chaow <94...@qq.com>
AuthorDate: Mon Nov 29 11:07:36 2021 +0800

    IOTDB-1823 group by multi level
---
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   4 +-
 client-cpp/pom.xml                                 |   4 +-
 compile-tools/pom.xml                              |   6 +-
 distribution/pom.xml                               |   2 +-
 example/client-cpp-example/pom.xml                 |   2 +-
 example/udf/pom.xml                                |   2 +-
 grafana/pom.xml                                    |   2 +-
 jdbc/pom.xml                                       |   2 +-
 pom.xml                                            |   8 +-
 .../db/qp/logical/crud/GroupByLevelController.java | 116 ++++++++++++++
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |  31 ++--
 .../iotdb/db/qp/logical/crud/SFWOperator.java      |  10 ++
 .../iotdb/db/qp/logical/crud/SelectOperator.java   |  40 +++++
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |  63 ++++----
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  18 ++-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   8 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java | 172 +++++++++++++--------
 .../query/dataset/groupby/GroupByTimeDataSet.java  |   3 +-
 .../db/query/executor/AggregationExecutor.java     |   2 +-
 .../iotdb/db/query/executor/QueryRouter.java       |   7 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |   2 +-
 .../org/apache/iotdb/db/utils/AggregateUtils.java  |  71 +++++----
 .../dataset/groupby/GroupByLevelDataSetTest.java   |  10 ++
 23 files changed, 420 insertions(+), 165 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index a0c49c6..0062431 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -322,7 +322,7 @@ groupByTimeClause
             COMMA DURATION
             (COMMA DURATION)?
             RR_BRACKET
-            COMMA LEVEL OPERATOR_EQ INT
+            COMMA LEVEL OPERATOR_EQ INT (COMMA INT)*
     ;
 
 groupByFillClause
@@ -334,7 +334,7 @@ groupByFillClause
      ;
 
 groupByLevelClause
-    : GROUP BY LEVEL OPERATOR_EQ INT
+    : GROUP BY LEVEL OPERATOR_EQ INT (COMMA INT)*
     ;
 
 typeClause
diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index c24caa1..a44b847 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
                 <cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
                 <thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
                 <iotdb.server.script>start-server.bat</iotdb.server.script>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 188ac3a..c046288 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
         <cmake-version>3.17.3</cmake-version>
         <openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
         <bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
-        <cmake.build.type />
+        <cmake.build.type/>
     </properties>
     <modules>
         <module>thrift</module>
@@ -114,8 +114,8 @@
                 <thrift.make.executable>make</thrift.make.executable>
                 <thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
                 <gradlew.executable>gradlew.bat</gradlew.executable>
-                <boost.include.dir />
-                <boost.library.dir />
+                <boost.include.dir/>
+                <boost.library.dir/>
             </properties>
         </profile>
     </profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ef366ac..0ff89bc 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
     </parent>
     <artifactId>iotdb-distribution</artifactId>
     <name>IoTDB Distribution</name>
-    <modules />
+    <modules/>
     <build>
         <plugins>
             <plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index cb6069a..d7cd7ee 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
             <properties>
                 <cmake.generator>Visual Studio 16 2019</cmake.generator>
                 <cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
-                <boost.include.dir />
+                <boost.include.dir/>
             </properties>
         </profile>
         <profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index c051c61..62c85c7 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
                         <importOrder>
                             <order>org.apache.iotdb,,javax,java,\#</order>
                         </importOrder>
-                        <removeUnusedImports />
+                        <removeUnusedImports/>
                     </java>
                 </configuration>
                 <executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 8a0c7c2..0b39db1 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         <resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 6a5eb43..5cb73c0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index 7e59148..d64a674 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
         <sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.6</gson.version>
-        <argLine />
+        <argLine/>
         <!-- whether enable compiling the cpp client-->
         <client-cpp>false</client-cpp>
         <!-- disable enforcer by default-->
@@ -688,7 +688,7 @@
                             <importOrder>
                                 <order>org.apache.iotdb,,javax,java,\#</order>
                             </importOrder>
-                            <removeUnusedImports />
+                            <removeUnusedImports/>
                         </java>
                         <lineEndings>UNIX</lineEndings>
                     </configuration>
@@ -760,7 +760,7 @@
                         <phase>validate</phase>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -806,7 +806,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+                                <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java
new file mode 100644
index 0000000..c9ade35
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java
@@ -0,0 +1,116 @@
+package org.apache.iotdb.db.qp.logical.crud;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to control the row number of group by level query. For example, selected
+ * series[root.sg.d1.s1, root.sg.d2.s1, root.sg2.d1.s1], level = 1; the result rows will be
+ * [root.sg.*.s1, root.sg2.*.s1], sLimit and sOffset will be used to control the result numbers
+ * rather than the selected series.
+ */
+public class GroupByLevelController {
+
+  private final int seriesLimit;
+  private int seriesOffset;
+  Set<String> limitPaths;
+  Set<String> offsetPaths;
+  private final int[] levels;
+  int prevSize = 0;
+  /** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
+  private Map<String, String> groupedPathMap;
+
+  public GroupByLevelController(int seriesLimit, int seriesOffset, int[] levels) {
+    this.seriesLimit = seriesLimit;
+    this.seriesOffset = seriesOffset;
+    this.limitPaths = seriesLimit > 0 ? new HashSet<>() : null;
+    this.offsetPaths = seriesOffset > 0 ? new HashSet<>() : null;
+    this.groupedPathMap = new LinkedHashMap<>();
+    this.levels = levels;
+  }
+
+  public String getGroupedPath(String rawPath) {
+    return groupedPathMap.get(rawPath);
+  }
+
+  public void control(List<PartialPath> resultColumns, boolean isCountStar, String functionName) {
+    Iterator<PartialPath> iterator = resultColumns.iterator();
+    for (int i = 0; i < prevSize; i++) {
+      iterator.next();
+    }
+    while (iterator.hasNext()) {
+      PartialPath resultColumn = iterator.next();
+      String groupedPath = generatePartialPathByLevel(resultColumn, levels, isCountStar);
+      String rawPath = String.format("%s(%s)", functionName, resultColumn.getFullPath());
+      String pathWithFunction = String.format("%s(%s)", functionName, groupedPath);
+
+      if (seriesLimit == 0 && seriesOffset == 0) {
+        groupedPathMap.put(rawPath, pathWithFunction);
+      } else {
+        if (seriesOffset > 0 && offsetPaths != null) {
+          offsetPaths.add(pathWithFunction);
+          if (offsetPaths.size() <= seriesOffset) {
+            iterator.remove();
+            if (offsetPaths.size() == seriesOffset) {
+              seriesOffset = 0;
+            }
+          }
+        } else if (offsetPaths == null || !offsetPaths.contains(pathWithFunction)) {
+          limitPaths.add(pathWithFunction);
+          if (seriesLimit > 0 && limitPaths.size() > seriesLimit) {
+            iterator.remove();
+            limitPaths.remove(pathWithFunction);
+          } else {
+            groupedPathMap.put(rawPath, pathWithFunction);
+          }
+        } else {
+          iterator.remove();
+        }
+      }
+    }
+    prevSize = resultColumns.size();
+  }
+
+  /**
+   * Transform an originalPath to a partial path that satisfies given level. Path nodes don't
+   * satisfy the given level will be replaced by "*" except the sensor level, e.g.
+   * generatePartialPathByLevel("root.sg.dh.d1.s1", 2) will return "root.*.dh.*.s1".
+   *
+   * <p>Especially, if count(*), then the sensor level will be replaced by "*" too.
+   *
+   * @return result partial path
+   */
+  public static String generatePartialPathByLevel(
+      PartialPath originalPath, int[] pathLevels, boolean isCountStar) {
+    String[] nodes = originalPath.getNodes();
+    Set<Integer> levelSet = new HashSet<>();
+    for (int level : pathLevels) {
+      levelSet.add(level);
+    }
+
+    StringBuilder transformedPath = new StringBuilder();
+    transformedPath.append(nodes[0]).append(TsFileConstant.PATH_SEPARATOR);
+    for (int k = 1; k < nodes.length - 1; k++) {
+      if (levelSet.contains(k)) {
+        transformedPath.append(nodes[k]);
+      } else {
+        transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+      }
+      transformedPath.append(TsFileConstant.PATH_SEPARATOR);
+    }
+    if (isCountStar) {
+      transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+    } else {
+      transformedPath.append(nodes[nodes.length - 1]);
+    }
+    return transformedPath.toString();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index c640470..084627c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -43,9 +43,6 @@ public class QueryOperator extends SFWOperator {
   private Map<TSDataType, IFill> fillTypes;
   private boolean isFill = false;
 
-  private boolean isGroupByLevel = false;
-  private int level = -1;
-
   private int rowLimit = 0;
   private int rowOffset = 0;
   private int seriesLimit = 0;
@@ -105,12 +102,24 @@ public class QueryOperator extends SFWOperator {
     this.fillTypes = fillTypes;
   }
 
-  public boolean isGroupByLevel() {
-    return isGroupByLevel;
+  public int[] getLevels() {
+    if (getSelectOperator() != null) {
+      return getSelectOperator().getLevels();
+    }
+    return null;
   }
 
-  public void setGroupByLevel(boolean isGroupBy) {
-    this.isGroupByLevel = isGroupBy;
+  public void setLevels(int[] levels) {
+    if (getSelectOperator() != null) {
+      getSelectOperator().setLevels(levels);
+    }
+  }
+
+  public boolean isGroupByLevel() {
+    if (getSelectOperator() != null) {
+      return getSelectOperator().isGroupByLevel();
+    }
+    return false;
   }
 
   public boolean isLeftCRightO() {
@@ -209,14 +218,6 @@ public class QueryOperator extends SFWOperator {
     this.isAlignByTime = isAlignByTime;
   }
 
-  public int getLevel() {
-    return level;
-  }
-
-  public void setLevel(int level) {
-    this.level = level;
-  }
-
   public boolean isGroupByTime() {
     return isGroupByTime;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
index 14d510b..5eb3820 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
@@ -95,4 +95,14 @@ public abstract class SFWOperator extends RootOperator {
   public boolean isLastQuery() {
     return lastQuery;
   }
+
+  public boolean isCountStar() {
+    return selectOperator != null && selectOperator.isCountStar();
+  }
+
+  public void checkCountStar() {
+    if (selectOperator != null) {
+      selectOperator.checkCountStar();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
index 5b34ea5..eced489 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
@@ -18,7 +18,9 @@
  */
 package org.apache.iotdb.db.qp.logical.crud;
 
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.query.udf.core.context.UDFContext;
 
@@ -38,6 +40,10 @@ public final class SelectOperator extends Operator {
   private boolean udfQuery;
   private boolean hasBuiltinAggregation;
 
+  private GroupByLevelController groupByLevelController;
+  private boolean isCountStar;
+  private int[] levels;
+
   /** init with tokenIntType, default operatorType is <code>OperatorType.SELECT</code>. */
   public SelectOperator(int tokenIntType, ZoneId zoneId) {
     super(tokenIntType);
@@ -113,4 +119,38 @@ public final class SelectOperator extends Operator {
   public void setUdfList(List<UDFContext> udfList) {
     this.udfList = udfList;
   }
+
+  public boolean isCountStar() {
+    return isCountStar;
+  }
+
+  public void checkCountStar() {
+    if (hasAggregation()
+        && getAggregations().size() == 1
+        && getAggregations().get(0).equals(SQLConstant.COUNT)
+        && getSuffixPaths().size() == 1
+        && getSuffixPaths().get(0).equals(IoTDBConstant.PATH_WILDCARD)) {
+      isCountStar = true;
+    }
+  }
+
+  public int[] getLevels() {
+    return levels;
+  }
+
+  public void setLevels(int[] levels) {
+    this.levels = levels;
+  }
+
+  public boolean isGroupByLevel() {
+    return !(levels == null || levels.length == 0);
+  }
+
+  public GroupByLevelController getGroupByLevelController() {
+    return groupByLevelController;
+  }
+
+  public void setGroupByLevelController(GroupByLevelController groupByLevelController) {
+    this.groupByLevelController = groupByLevelController;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index d2c8a5d..cb4671a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.GroupByLevelController;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.AggregateUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
@@ -40,8 +40,10 @@ public class AggregationPlan extends RawDataQueryPlan {
 
   private List<String> aggregations = new ArrayList<>();
   private List<String> deduplicatedAggregations = new ArrayList<>();
+  private GroupByLevelController groupByLevelController;
+  private boolean isCountStar;
 
-  private int level = -1;
+  private int[] levels;
   // group by level aggregation result path
   private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>();
 
@@ -71,12 +73,24 @@ public class AggregationPlan extends RawDataQueryPlan {
     this.deduplicatedAggregations = deduplicatedAggregations;
   }
 
-  public int getLevel() {
-    return level;
+  public int[] getLevels() {
+    return levels;
   }
 
-  public void setLevel(int level) {
-    this.level = level;
+  public void setLevels(int[] levels) {
+    this.levels = levels;
+  }
+
+  public boolean isGroupByLevel() {
+    return !(levels == null || levels.length == 0);
+  }
+
+  public boolean isCountStar() {
+    return isCountStar;
+  }
+
+  public void setCountStar(boolean countStar) {
+    isCountStar = countStar;
   }
 
   public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException {
@@ -85,20 +99,14 @@ public class AggregationPlan extends RawDataQueryPlan {
     }
     List<PartialPath> seriesPaths = getPaths();
     List<TSDataType> dataTypes = getDataTypes();
-    try {
-      for (int i = 0; i < seriesPaths.size(); i++) {
-        String transformedPath =
-            AggregateUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel());
-        String key = getAggregations().get(i) + "(" + transformedPath + ")";
-        if (!levelAggPaths.containsKey(key)) {
-          AggregateResult aggRet =
-              AggregateResultFactory.getAggrResultByName(
-                  getAggregations().get(i), dataTypes.get(i));
-          levelAggPaths.put(key, aggRet);
-        }
+    for (int i = 0; i < seriesPaths.size(); i++) {
+      String rawPath = getAggregations().get(i) + "(" + seriesPaths.get(i).getFullPath() + ")";
+      String key = groupByLevelController.getGroupedPath(rawPath);
+      if (!levelAggPaths.containsKey(key)) {
+        AggregateResult aggRet =
+            AggregateResultFactory.getAggrResultByName(getAggregations().get(i), dataTypes.get(i));
+        levelAggPaths.put(key, aggRet);
       }
-    } catch (IllegalPathException e) {
-      throw new QueryProcessException(e.getMessage());
     }
     return levelAggPaths;
   }
@@ -124,14 +132,17 @@ public class AggregationPlan extends RawDataQueryPlan {
   public String getColumnForDisplay(String columnForReader, int pathIndex)
       throws IllegalPathException {
     String columnForDisplay = columnForReader;
-    if (level >= 0) {
-      PartialPath path = paths.get(pathIndex);
-      String aggregatePath =
-          path.isMeasurementAliasExists()
-              ? AggregateUtils.generatePartialPathByLevel(path.getFullPathWithAlias(), level)
-              : AggregateUtils.generatePartialPathByLevel(path.toString(), level);
-      columnForDisplay = aggregations.get(pathIndex) + "(" + aggregatePath + ")";
+    if (isGroupByLevel()) {
+      columnForDisplay = groupByLevelController.getGroupedPath(columnForDisplay);
     }
     return columnForDisplay;
   }
+
+  public GroupByLevelController getGroupByLevelController() {
+    return groupByLevelController;
+  }
+
+  public void setGroupByLevelController(GroupByLevelController groupByLevelController) {
+    this.groupByLevelController = groupByLevelController;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 89fdf2e..ded058f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1377,8 +1377,17 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
     if (!queryOp.hasAggregation()) {
       throw new SQLParserException("There is no aggregation function with group by query");
     }
-    queryOp.setGroupByLevel(true);
-    queryOp.setLevel(Integer.parseInt(ctx.INT().getText()));
+    setLevels(queryOp, ctx.LEVEL(), ctx.INT());
+  }
+
+  private void setLevels(QueryOperator queryOp, TerminalNode level, List<TerminalNode> anInt) {
+    if (level != null && anInt != null) {
+      int[] levels = new int[anInt.size()];
+      for (int i = 0; i < anInt.size(); i++) {
+        levels[i] = Integer.parseInt(anInt.get(i).getText());
+      }
+      queryOp.setLevels(levels);
+    }
   }
 
   public void parseFillClause(FillClauseContext ctx, QueryOperator queryOp) {
@@ -1489,10 +1498,7 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
 
     parseTimeInterval(ctx.timeInterval(), queryOp);
 
-    if (ctx.INT() != null) {
-      queryOp.setGroupByLevel(true);
-      queryOp.setLevel(Integer.parseInt(ctx.INT().getText()));
-    }
+    setLevels(queryOp, ctx.LEVEL(), ctx.INT());
   }
 
   private void parseGroupByFillClause(GroupByFillClauseContext ctx, QueryOperator queryOp) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8802493..0cf097c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -499,7 +499,11 @@ public class PhysicalGenerator {
           }
         }
       } else if (queryOperator.isGroupByLevel()) {
-        ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
+        ((AggregationPlan) queryPlan)
+            .setGroupByLevelController(
+                queryOperator.getSelectOperator().getGroupByLevelController());
+        ((AggregationPlan) queryPlan).setCountStar(queryOperator.isCountStar());
+        ((AggregationPlan) queryPlan).setLevels(queryOperator.getLevels());
         try {
           if (!verifyAllAggregationDataTypesEqual(queryOperator)) {
             throw new QueryProcessException("Aggregate among unmatched data types");
@@ -618,7 +622,7 @@ public class PhysicalGenerator {
     } else if (queryPlan instanceof FillQueryPlan) {
       alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
     } else if (queryPlan instanceof AggregationPlan) {
-      if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+      if (((AggregationPlan) queryPlan).isGroupByLevel()) {
         throw new QueryProcessException("group by level does not support align by device now.");
       }
       alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 4432eb5..cd1bb24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 import org.apache.iotdb.db.qp.logical.crud.FromOperator;
 import org.apache.iotdb.db.qp.logical.crud.FunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.GroupByLevelController;
 import org.apache.iotdb.db.qp.logical.crud.InOperator;
 import org.apache.iotdb.db.qp.logical.crud.LikeOperator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -92,6 +94,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
         logger.warn(WARNING_NO_SUFFIX_PATHS);
         return operator;
       }
+      sfwOperator.checkCountStar();
     }
 
     checkAggrOfSelectOperator(select);
@@ -195,6 +198,16 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     List<UDFContext> originUdfList = selectOperator.getUdfList();
     List<UDFContext> afterConcatUdfList = new ArrayList<>();
 
+    // TODO how to get the final result column of group by level
+
+    // 1. group by level, so need to get all measurements for each function one by one
+
+    // 2. group by level, and apply soffset and slimit
+
+    // 3. continue to step 1 or go to step 4 when slimit is arrived or function list is end
+
+    // 4. put all satisfied path to the operator and aggregation
+
     for (int i = 0; i < suffixPaths.size(); i++) {
       // selectPath cannot start with ROOT, which is guaranteed by TSParser
       PartialPath selectPath = suffixPaths.get(i);
@@ -383,81 +396,114 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
     List<String> newAggregations = new ArrayList<>();
     List<UDFContext> newUdfList = new ArrayList<>();
 
-    for (int i = 0; i < afterConcatPaths.size(); i++) {
-      try {
-        PartialPath afterConcatPath = afterConcatPaths.get(i);
-
-        if (afterConcatPath == null) { // udf
-          UDFContext originUdf = afterConcatUdfList.get(i);
-          List<PartialPath> originPaths = originUdf.getPaths();
-          List<List<PartialPath>> extendedPaths = new ArrayList<>();
-
-          boolean atLeastOneSeriesNotExisted = false;
-          for (PartialPath originPath : originPaths) {
-            List<PartialPath> actualPaths = removeWildcard(originPath, 0, 0).left;
-            if (actualPaths.isEmpty()) {
-              atLeastOneSeriesNotExisted = true;
-              break;
-            }
-            checkAndSetTsAlias(actualPaths, originPath);
-            extendedPaths.add(actualPaths);
-          }
-          if (atLeastOneSeriesNotExisted) {
-            continue;
-          }
-
-          List<List<PartialPath>> actualPaths = new ArrayList<>();
-          cartesianProduct(extendedPaths, actualPaths, 0, new ArrayList<>());
+    if (selectOperator.isGroupByLevel()) {
+      GroupByLevelController groupByLevelController =
+          new GroupByLevelController(limit, offset, selectOperator.getLevels());
+      selectOperator.setGroupByLevelController(groupByLevelController);
+      // 1. group by level, so need to get all measurements for each function one by one
+      List<PartialPath> resultColumns = new LinkedList<>();
+      int preSize = 0;
+      for (int i = 0; i < afterConcatPaths.size(); i++) {
+        try {
+          PartialPath afterConcatPath = afterConcatPaths.get(i);
+
+          Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, 0, 0);
+          List<PartialPath> truePaths = pair.left;
+          checkAndSetTsAlias(truePaths, afterConcatPath);
+          resultColumns.addAll(truePaths);
+
+          groupByLevelController.control(
+              resultColumns, selectOperator.isCountStar(), afterConcatAggregations.get(i));
+
+        } catch (MetadataException e) {
+          throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+        }
 
-          for (List<PartialPath> actualPath : actualPaths) {
-            if (offset != 0) {
-              --offset;
+        for (int j = preSize; j < resultColumns.size(); j++) {
+          extendListSafely(afterConcatAggregations, i, newAggregations);
+          newUdfList.add(null);
+        }
+        preSize = resultColumns.size();
+      }
+      consumed = resultColumns.size();
+      newSuffixPathList.addAll(resultColumns);
+    } else {
+      for (int i = 0; i < afterConcatPaths.size(); i++) {
+        try {
+          PartialPath afterConcatPath = afterConcatPaths.get(i);
+
+          if (afterConcatPath == null) { // udf
+            UDFContext originUdf = afterConcatUdfList.get(i);
+            List<PartialPath> originPaths = originUdf.getPaths();
+            List<List<PartialPath>> extendedPaths = new ArrayList<>();
+
+            boolean atLeastOneSeriesNotExisted = false;
+            for (PartialPath originPath : originPaths) {
+              List<PartialPath> actualPaths = removeWildcard(originPath, 0, 0).left;
+              if (actualPaths.isEmpty()) {
+                atLeastOneSeriesNotExisted = true;
+                break;
+              }
+              checkAndSetTsAlias(actualPaths, originPath);
+              extendedPaths.add(actualPaths);
+            }
+            if (atLeastOneSeriesNotExisted) {
               continue;
-            } else if (limit != 0) {
-              --limit;
-            } else {
-              break;
             }
 
-            newSuffixPathList.add(null);
-            extendListSafely(afterConcatAggregations, i, newAggregations);
+            List<List<PartialPath>> actualPaths = new ArrayList<>();
+            cartesianProduct(extendedPaths, actualPaths, 0, new ArrayList<>());
 
-            newUdfList.add(
-                new UDFContext(originUdf.getName(), originUdf.getAttributes(), actualPath));
-          }
-        } else { // non-udf
-          Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, limit, offset);
-          List<PartialPath> actualPaths = pair.left;
-          checkAndSetTsAlias(actualPaths, afterConcatPath);
+            for (List<PartialPath> actualPath : actualPaths) {
+              if (offset != 0) {
+                --offset;
+                continue;
+              } else if (limit != 0) {
+                --limit;
+              } else {
+                break;
+              }
 
-          for (PartialPath actualPath : actualPaths) {
-            newSuffixPathList.add(actualPath);
-            extendListSafely(afterConcatAggregations, i, newAggregations);
+              newSuffixPathList.add(null);
+              extendListSafely(afterConcatAggregations, i, newAggregations);
 
-            newUdfList.add(null);
-          }
+              newUdfList.add(
+                  new UDFContext(originUdf.getName(), originUdf.getAttributes(), actualPath));
+            }
+          } else { // non-udf
+            Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, limit, offset);
+            List<PartialPath> actualPaths = pair.left;
+            checkAndSetTsAlias(actualPaths, afterConcatPath);
+
+            for (PartialPath actualPath : actualPaths) {
+              newSuffixPathList.add(actualPath);
+              extendListSafely(afterConcatAggregations, i, newAggregations);
 
-          consumed += pair.right;
-          if (offset != 0) {
-            int delta = offset - pair.right;
-            offset = Math.max(delta, 0);
-            if (delta < 0) {
-              limit += delta;
+              newUdfList.add(null);
+            }
+
+            consumed += pair.right;
+            if (offset != 0) {
+              int delta = offset - pair.right;
+              offset = Math.max(delta, 0);
+              if (delta < 0) {
+                limit += delta;
+              }
+            } else {
+              limit -= pair.right;
             }
-          } else {
-            limit -= pair.right;
           }
-        }
 
-        if (newSuffixPathList.size() > MAX_QUERY_PATH_NUM) {
-          throw new PathNumOverLimitException();
-        }
-        if (limit == 0) {
-          break;
-        }
+          if (newSuffixPathList.size() > MAX_QUERY_PATH_NUM) {
+            throw new PathNumOverLimitException();
+          }
+          if (limit == 0) {
+            break;
+          }
 
-      } catch (MetadataException e) {
-        throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+        } catch (MetadataException e) {
+          throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+        }
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index d6f81b0..d9c6c69 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -57,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
     this.context = context;
 
     if (logger.isDebugEnabled()) {
-      logger.debug("paths " + this.paths + " level:" + plan.getLevel());
+      logger.debug("paths " + this.paths + " level:" + Arrays.toString(plan.getLevels()));
     }
 
     Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0371f09..cd8cb99 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -470,7 +470,7 @@ public class AggregationExecutor {
     }
 
     SingleDataSet dataSet;
-    if (plan.getLevel() >= 0) {
+    if (plan.isGroupByLevel()) {
       Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
 
       List<AggregateResult> mergedAggResults =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 71b0219..500cfff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -106,7 +107,7 @@ public class QueryRouter implements IQueryRouter {
           "paths:"
               + aggregationPlan.getPaths()
               + " level:"
-              + aggregationPlan.getLevel()
+              + Arrays.toString(aggregationPlan.getLevels())
               + " duplicatePaths:"
               + aggregationPlan.getDeduplicatedPaths()
               + " deduplicatePaths:"
@@ -149,7 +150,7 @@ public class QueryRouter implements IQueryRouter {
           IOException {
 
     if (logger.isDebugEnabled()) {
-      logger.debug("paths:" + groupByTimePlan.getPaths() + " level:" + groupByTimePlan.getLevel());
+      logger.debug("paths:" + groupByTimePlan.getPaths() + " level:" + groupByTimePlan.getLevels());
     }
 
     GroupByEngineDataSet dataSet = null;
@@ -177,7 +178,7 @@ public class QueryRouter implements IQueryRouter {
     // we support group by level for count operation
     // details at https://issues.apache.org/jira/browse/IOTDB-622
     // and UserGuide/Operation Manual/DML
-    if (groupByTimePlan.getLevel() >= 0) {
+    if (groupByTimePlan.isGroupByLevel()) {
       return groupByLevelWithoutTimeIntervalDataSet(context, groupByTimePlan, dataSet);
     }
     return dataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 37127c9..feefe1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -855,7 +855,7 @@ public class TSServiceImpl implements TSIService.Iface {
       // because the query dataset and query id is different although the header of last query is
       // same.
       return StaticResps.LAST_RESP.deepCopy();
-    } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
+    } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).isGroupByLevel()) {
       Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel();
       for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
         respColumns.add(entry.getKey());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
index 2fbf8ef..6f7eb6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
@@ -36,35 +36,44 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class AggregateUtils {
   /**
-   * Transform an originalPath to a partial path that satisfies given level. Path nodes exceed the
-   * given level will be replaced by "*", e.g. generatePartialPathByLevel("root.sg.dh.d1.s1", 2)
-   * will return "root.sg.dh.*.s1"
+   * Transform an originalPath to a partial path that satisfies given level. Path nodes don't
+   * satisfy the given level will be replaced by "*" except the sensor level, e.g.
+   * generatePartialPathByLevel("root.sg.dh.d1.s1", 2) will return "root.*.dh.*.s1".
+   *
+   * <p>Especially, if count(*), then the sensor level will be replaced by "*" too.
    *
-   * @param originalPath the original timeseries path
-   * @param pathLevel the expected path level
    * @return result partial path
    */
-  public static String generatePartialPathByLevel(String originalPath, int pathLevel)
-      throws IllegalPathException {
-    String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath);
-    if (tmpPath.length <= pathLevel) {
-      return originalPath;
+  public static String generatePartialPathByLevel(
+      String originalPath, int[] pathLevels, boolean isCountStar) throws IllegalPathException {
+    String[] nodes = MetaUtils.splitPathToDetachedPath(originalPath);
+    Set<Integer> levelSet = new HashSet<>();
+    for (int level : pathLevels) {
+      levelSet.add(level);
     }
+
     StringBuilder transformedPath = new StringBuilder();
-    transformedPath.append(tmpPath[0]);
-    for (int k = 1; k < tmpPath.length - 1; k++) {
-      if (k <= pathLevel) {
-        transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
+    transformedPath.append(nodes[0]).append(TsFileConstant.PATH_SEPARATOR);
+    for (int k = 1; k < nodes.length - 1; k++) {
+      if (levelSet.contains(k)) {
+        transformedPath.append(nodes[k]);
       } else {
-        transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
+        transformedPath.append(IoTDBConstant.PATH_WILDCARD);
       }
+      transformedPath.append(TsFileConstant.PATH_SEPARATOR);
+    }
+    if (isCountStar) {
+      transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+    } else {
+      transformedPath.append(nodes[nodes.length - 1]);
     }
-    transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length - 1]);
     return transformedPath.toString();
   }
 
@@ -140,23 +149,23 @@ public class AggregateUtils {
 
     List<AggregateResult> resultSet = new ArrayList<>();
     List<PartialPath> dupPaths = plan.getDeduplicatedPaths();
-    try {
-      for (int i = 0; i < aggResults.size(); i++) {
-        if (aggResults.get(i) != null) {
-          String transformedPath =
-              generatePartialPathByLevel(dupPaths.get(i).getFullPath(), plan.getLevel());
-          String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath + ")";
-          AggregateResult tempAggResult = finalPaths.get(key);
-          if (tempAggResult == null) {
-            finalPaths.put(key, aggResults.get(i));
-          } else {
-            tempAggResult.merge(aggResults.get(i));
-            finalPaths.put(key, tempAggResult);
-          }
+    for (int i = 0; i < aggResults.size(); i++) {
+      if (aggResults.get(i) != null) {
+        //          String transformedPath =
+        //              generatePartialPathByLevel(
+        //                  dupPaths.get(i).getFullPath(), plan.getLevels(), plan.isCountStar());
+        //          String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath +
+        // ")";
+        String rawPath = plan.getDeduplicatedAggregations().get(i) + "(" + dupPaths.get(i).getFullPath() + ")";
+        String key = plan.getGroupByLevelController().getGroupedPath(rawPath);
+        AggregateResult tempAggResult = finalPaths.get(key);
+        if (tempAggResult == null) {
+          finalPaths.put(key, aggResults.get(i));
+        } else {
+          tempAggResult.merge(aggResults.get(i));
+          finalPaths.put(key, tempAggResult);
         }
       }
-    } catch (IllegalPathException e) {
-      throw new QueryProcessException(e.getMessage());
     }
 
     for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 602ee20..4518a02 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -147,5 +147,15 @@ public class GroupByLevelDataSetTest {
 
     assertTrue(dataSet.hasNext());
     assertEquals("0\t1", dataSet.next().toString());
+
+    // multi level
+    queryPlan =
+        (QueryPlan)
+            processor.parseSQLToPhysicalPlan(
+                "select count(s1) from root.test.*,root.vehicle.* group by level=1,2");
+    dataSet = queryExecutor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+
+    assertTrue(dataSet.hasNext());
+    assertEquals("0\t12\t10", dataSet.next().toString());
   }
 }