You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/11/29 03:08:41 UTC
[iotdb] 01/01: IOTDB-1823 group by multi level
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch group_by_multi_level_1129
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fdfb4ae642f0942a75f5549cf8716ac81175ea70
Author: chaow <94...@qq.com>
AuthorDate: Mon Nov 29 11:07:36 2021 +0800
IOTDB-1823 group by multi level
---
.../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 4 +-
client-cpp/pom.xml | 4 +-
compile-tools/pom.xml | 6 +-
distribution/pom.xml | 2 +-
example/client-cpp-example/pom.xml | 2 +-
example/udf/pom.xml | 2 +-
grafana/pom.xml | 2 +-
jdbc/pom.xml | 2 +-
pom.xml | 8 +-
.../db/qp/logical/crud/GroupByLevelController.java | 116 ++++++++++++++
.../iotdb/db/qp/logical/crud/QueryOperator.java | 31 ++--
.../iotdb/db/qp/logical/crud/SFWOperator.java | 10 ++
.../iotdb/db/qp/logical/crud/SelectOperator.java | 40 +++++
.../iotdb/db/qp/physical/crud/AggregationPlan.java | 63 ++++----
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 18 ++-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 8 +-
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 172 +++++++++++++--------
.../query/dataset/groupby/GroupByTimeDataSet.java | 3 +-
.../db/query/executor/AggregationExecutor.java | 2 +-
.../iotdb/db/query/executor/QueryRouter.java | 7 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 2 +-
.../org/apache/iotdb/db/utils/AggregateUtils.java | 71 +++++----
.../dataset/groupby/GroupByLevelDataSetTest.java | 10 ++
23 files changed, 420 insertions(+), 165 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
index a0c49c6..0062431 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4
@@ -322,7 +322,7 @@ groupByTimeClause
COMMA DURATION
(COMMA DURATION)?
RR_BRACKET
- COMMA LEVEL OPERATOR_EQ INT
+ COMMA LEVEL OPERATOR_EQ INT (COMMA INT)*
;
groupByFillClause
@@ -334,7 +334,7 @@ groupByFillClause
;
groupByLevelClause
- : GROUP BY LEVEL OPERATOR_EQ INT
+ : GROUP BY LEVEL OPERATOR_EQ INT (COMMA INT)*
;
typeClause
diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index c24caa1..a44b847 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
<cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
<thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
<iotdb.server.script>start-server.bat</iotdb.server.script>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
<profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 188ac3a..c046288 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
<cmake-version>3.17.3</cmake-version>
<openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
<bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
- <cmake.build.type />
+ <cmake.build.type/>
</properties>
<modules>
<module>thrift</module>
@@ -114,8 +114,8 @@
<thrift.make.executable>make</thrift.make.executable>
<thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
<gradlew.executable>gradlew.bat</gradlew.executable>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
</profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ef366ac..0ff89bc 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
</parent>
<artifactId>iotdb-distribution</artifactId>
<name>IoTDB Distribution</name>
- <modules />
+ <modules/>
<build>
<plugins>
<plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index cb6069a..d7cd7ee 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
<properties>
<cmake.generator>Visual Studio 16 2019</cmake.generator>
<cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
- <boost.include.dir />
+ <boost.include.dir/>
</properties>
</profile>
<profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index c051c61..62c85c7 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
</configuration>
<executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 8a0c7c2..0b39db1 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 6a5eb43..5cb73c0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/pom.xml b/pom.xml
index 7e59148..d64a674 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
<sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
- <argLine />
+ <argLine/>
<!-- whether enable compiling the cpp client-->
<client-cpp>false</client-cpp>
<!-- disable enforcer by default-->
@@ -688,7 +688,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<lineEndings>UNIX</lineEndings>
</configuration>
@@ -760,7 +760,7 @@
<phase>validate</phase>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
<goals>
@@ -806,7 +806,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability to build single modules -->
<!--reactorModuleConvergence/-->
- <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+ <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java
new file mode 100644
index 0000000..c9ade35
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByLevelController.java
@@ -0,0 +1,116 @@
+package org.apache.iotdb.db.qp.logical.crud;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to control the row number of group by level query. For example, selected
+ * series[root.sg.d1.s1, root.sg.d2.s1, root.sg2.d1.s1], level = 1; the result rows will be
+ * [root.sg.*.s1, root.sg2.*.s1], sLimit and sOffset will be used to control the result numbers
+ * rather than the selected series.
+ */
+public class GroupByLevelController {
+
+ private final int seriesLimit;
+ private int seriesOffset;
+ Set<String> limitPaths;
+ Set<String> offsetPaths;
+ private final int[] levels;
+ int prevSize = 0;
+ /** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
+ private Map<String, String> groupedPathMap;
+
+ public GroupByLevelController(int seriesLimit, int seriesOffset, int[] levels) {
+ this.seriesLimit = seriesLimit;
+ this.seriesOffset = seriesOffset;
+ this.limitPaths = seriesLimit > 0 ? new HashSet<>() : null;
+ this.offsetPaths = seriesOffset > 0 ? new HashSet<>() : null;
+ this.groupedPathMap = new LinkedHashMap<>();
+ this.levels = levels;
+ }
+
+ public String getGroupedPath(String rawPath) {
+ return groupedPathMap.get(rawPath);
+ }
+
+ public void control(List<PartialPath> resultColumns, boolean isCountStar, String functionName) {
+ Iterator<PartialPath> iterator = resultColumns.iterator();
+ for (int i = 0; i < prevSize; i++) {
+ iterator.next();
+ }
+ while (iterator.hasNext()) {
+ PartialPath resultColumn = iterator.next();
+ String groupedPath = generatePartialPathByLevel(resultColumn, levels, isCountStar);
+ String rawPath = String.format("%s(%s)", functionName, resultColumn.getFullPath());
+ String pathWithFunction = String.format("%s(%s)", functionName, groupedPath);
+
+ if (seriesLimit == 0 && seriesOffset == 0) {
+ groupedPathMap.put(rawPath, pathWithFunction);
+ } else {
+ if (seriesOffset > 0 && offsetPaths != null) {
+ offsetPaths.add(pathWithFunction);
+ if (offsetPaths.size() <= seriesOffset) {
+ iterator.remove();
+ if (offsetPaths.size() == seriesOffset) {
+ seriesOffset = 0;
+ }
+ }
+ } else if (offsetPaths == null || !offsetPaths.contains(pathWithFunction)) {
+ limitPaths.add(pathWithFunction);
+ if (seriesLimit > 0 && limitPaths.size() > seriesLimit) {
+ iterator.remove();
+ limitPaths.remove(pathWithFunction);
+ } else {
+ groupedPathMap.put(rawPath, pathWithFunction);
+ }
+ } else {
+ iterator.remove();
+ }
+ }
+ }
+ prevSize = resultColumns.size();
+ }
+
+ /**
+ * Transform an originalPath to a partial path that satisfies given level. Path nodes don't
+ * satisfy the given level will be replaced by "*" except the sensor level, e.g.
+ * generatePartialPathByLevel("root.sg.dh.d1.s1", 2) will return "root.*.dh.*.s1".
+ *
+ * <p>Especially, if count(*), then the sensor level will be replaced by "*" too.
+ *
+ * @return result partial path
+ */
+ public static String generatePartialPathByLevel(
+ PartialPath originalPath, int[] pathLevels, boolean isCountStar) {
+ String[] nodes = originalPath.getNodes();
+ Set<Integer> levelSet = new HashSet<>();
+ for (int level : pathLevels) {
+ levelSet.add(level);
+ }
+
+ StringBuilder transformedPath = new StringBuilder();
+ transformedPath.append(nodes[0]).append(TsFileConstant.PATH_SEPARATOR);
+ for (int k = 1; k < nodes.length - 1; k++) {
+ if (levelSet.contains(k)) {
+ transformedPath.append(nodes[k]);
+ } else {
+ transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+ }
+ transformedPath.append(TsFileConstant.PATH_SEPARATOR);
+ }
+ if (isCountStar) {
+ transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+ } else {
+ transformedPath.append(nodes[nodes.length - 1]);
+ }
+ return transformedPath.toString();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index c640470..084627c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -43,9 +43,6 @@ public class QueryOperator extends SFWOperator {
private Map<TSDataType, IFill> fillTypes;
private boolean isFill = false;
- private boolean isGroupByLevel = false;
- private int level = -1;
-
private int rowLimit = 0;
private int rowOffset = 0;
private int seriesLimit = 0;
@@ -105,12 +102,24 @@ public class QueryOperator extends SFWOperator {
this.fillTypes = fillTypes;
}
- public boolean isGroupByLevel() {
- return isGroupByLevel;
+ public int[] getLevels() {
+ if (getSelectOperator() != null) {
+ return getSelectOperator().getLevels();
+ }
+ return null;
}
- public void setGroupByLevel(boolean isGroupBy) {
- this.isGroupByLevel = isGroupBy;
+ public void setLevels(int[] levels) {
+ if (getSelectOperator() != null) {
+ getSelectOperator().setLevels(levels);
+ }
+ }
+
+ public boolean isGroupByLevel() {
+ if (getSelectOperator() != null) {
+ return getSelectOperator().isGroupByLevel();
+ }
+ return false;
}
public boolean isLeftCRightO() {
@@ -209,14 +218,6 @@ public class QueryOperator extends SFWOperator {
this.isAlignByTime = isAlignByTime;
}
- public int getLevel() {
- return level;
- }
-
- public void setLevel(int level) {
- this.level = level;
- }
-
public boolean isGroupByTime() {
return isGroupByTime;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
index 14d510b..5eb3820 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SFWOperator.java
@@ -95,4 +95,14 @@ public abstract class SFWOperator extends RootOperator {
public boolean isLastQuery() {
return lastQuery;
}
+
+ public boolean isCountStar() {
+ return selectOperator != null && selectOperator.isCountStar();
+ }
+
+ public void checkCountStar() {
+ if (selectOperator != null) {
+ selectOperator.checkCountStar();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
index 5b34ea5..eced489 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java
@@ -18,7 +18,9 @@
*/
package org.apache.iotdb.db.qp.logical.crud;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.query.udf.core.context.UDFContext;
@@ -38,6 +40,10 @@ public final class SelectOperator extends Operator {
private boolean udfQuery;
private boolean hasBuiltinAggregation;
+ private GroupByLevelController groupByLevelController;
+ private boolean isCountStar;
+ private int[] levels;
+
/** init with tokenIntType, default operatorType is <code>OperatorType.SELECT</code>. */
public SelectOperator(int tokenIntType, ZoneId zoneId) {
super(tokenIntType);
@@ -113,4 +119,38 @@ public final class SelectOperator extends Operator {
public void setUdfList(List<UDFContext> udfList) {
this.udfList = udfList;
}
+
+ public boolean isCountStar() {
+ return isCountStar;
+ }
+
+ public void checkCountStar() {
+ if (hasAggregation()
+ && getAggregations().size() == 1
+ && getAggregations().get(0).equals(SQLConstant.COUNT)
+ && getSuffixPaths().size() == 1
+ && getSuffixPaths().get(0).equals(IoTDBConstant.PATH_WILDCARD)) {
+ isCountStar = true;
+ }
+ }
+
+ public int[] getLevels() {
+ return levels;
+ }
+
+ public void setLevels(int[] levels) {
+ this.levels = levels;
+ }
+
+ public boolean isGroupByLevel() {
+ return !(levels == null || levels.length == 0);
+ }
+
+ public GroupByLevelController getGroupByLevelController() {
+ return groupByLevelController;
+ }
+
+ public void setGroupByLevelController(GroupByLevelController groupByLevelController) {
+ this.groupByLevelController = groupByLevelController;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
index d2c8a5d..cb4671a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java
@@ -22,9 +22,9 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.GroupByLevelController;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.AggregateUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
@@ -40,8 +40,10 @@ public class AggregationPlan extends RawDataQueryPlan {
private List<String> aggregations = new ArrayList<>();
private List<String> deduplicatedAggregations = new ArrayList<>();
+ private GroupByLevelController groupByLevelController;
+ private boolean isCountStar;
- private int level = -1;
+ private int[] levels;
// group by level aggregation result path
private final Map<String, AggregateResult> levelAggPaths = new LinkedHashMap<>();
@@ -71,12 +73,24 @@ public class AggregationPlan extends RawDataQueryPlan {
this.deduplicatedAggregations = deduplicatedAggregations;
}
- public int getLevel() {
- return level;
+ public int[] getLevels() {
+ return levels;
}
- public void setLevel(int level) {
- this.level = level;
+ public void setLevels(int[] levels) {
+ this.levels = levels;
+ }
+
+ public boolean isGroupByLevel() {
+ return !(levels == null || levels.length == 0);
+ }
+
+ public boolean isCountStar() {
+ return isCountStar;
+ }
+
+ public void setCountStar(boolean countStar) {
+ isCountStar = countStar;
}
public Map<String, AggregateResult> getAggPathByLevel() throws QueryProcessException {
@@ -85,20 +99,14 @@ public class AggregationPlan extends RawDataQueryPlan {
}
List<PartialPath> seriesPaths = getPaths();
List<TSDataType> dataTypes = getDataTypes();
- try {
- for (int i = 0; i < seriesPaths.size(); i++) {
- String transformedPath =
- AggregateUtils.generatePartialPathByLevel(seriesPaths.get(i).getFullPath(), getLevel());
- String key = getAggregations().get(i) + "(" + transformedPath + ")";
- if (!levelAggPaths.containsKey(key)) {
- AggregateResult aggRet =
- AggregateResultFactory.getAggrResultByName(
- getAggregations().get(i), dataTypes.get(i));
- levelAggPaths.put(key, aggRet);
- }
+ for (int i = 0; i < seriesPaths.size(); i++) {
+ String rawPath = getAggregations().get(i) + "(" + seriesPaths.get(i).getFullPath() + ")";
+ String key = groupByLevelController.getGroupedPath(rawPath);
+ if (!levelAggPaths.containsKey(key)) {
+ AggregateResult aggRet =
+ AggregateResultFactory.getAggrResultByName(getAggregations().get(i), dataTypes.get(i));
+ levelAggPaths.put(key, aggRet);
}
- } catch (IllegalPathException e) {
- throw new QueryProcessException(e.getMessage());
}
return levelAggPaths;
}
@@ -124,14 +132,17 @@ public class AggregationPlan extends RawDataQueryPlan {
public String getColumnForDisplay(String columnForReader, int pathIndex)
throws IllegalPathException {
String columnForDisplay = columnForReader;
- if (level >= 0) {
- PartialPath path = paths.get(pathIndex);
- String aggregatePath =
- path.isMeasurementAliasExists()
- ? AggregateUtils.generatePartialPathByLevel(path.getFullPathWithAlias(), level)
- : AggregateUtils.generatePartialPathByLevel(path.toString(), level);
- columnForDisplay = aggregations.get(pathIndex) + "(" + aggregatePath + ")";
+ if (isGroupByLevel()) {
+ columnForDisplay = groupByLevelController.getGroupedPath(columnForDisplay);
}
return columnForDisplay;
}
+
+ public GroupByLevelController getGroupByLevelController() {
+ return groupByLevelController;
+ }
+
+ public void setGroupByLevelController(GroupByLevelController groupByLevelController) {
+ this.groupByLevelController = groupByLevelController;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 89fdf2e..ded058f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -1377,8 +1377,17 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
if (!queryOp.hasAggregation()) {
throw new SQLParserException("There is no aggregation function with group by query");
}
- queryOp.setGroupByLevel(true);
- queryOp.setLevel(Integer.parseInt(ctx.INT().getText()));
+ setLevels(queryOp, ctx.LEVEL(), ctx.INT());
+ }
+
+ private void setLevels(QueryOperator queryOp, TerminalNode level, List<TerminalNode> anInt) {
+ if (level != null && anInt != null) {
+ int[] levels = new int[anInt.size()];
+ for (int i = 0; i < anInt.size(); i++) {
+ levels[i] = Integer.parseInt(anInt.get(i).getText());
+ }
+ queryOp.setLevels(levels);
+ }
}
public void parseFillClause(FillClauseContext ctx, QueryOperator queryOp) {
@@ -1489,10 +1498,7 @@ public class IoTDBSqlVisitor extends SqlBaseBaseVisitor<Operator> {
parseTimeInterval(ctx.timeInterval(), queryOp);
- if (ctx.INT() != null) {
- queryOp.setGroupByLevel(true);
- queryOp.setLevel(Integer.parseInt(ctx.INT().getText()));
- }
+ setLevels(queryOp, ctx.LEVEL(), ctx.INT());
}
private void parseGroupByFillClause(GroupByFillClauseContext ctx, QueryOperator queryOp) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 8802493..0cf097c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -499,7 +499,11 @@ public class PhysicalGenerator {
}
}
} else if (queryOperator.isGroupByLevel()) {
- ((AggregationPlan) queryPlan).setLevel(queryOperator.getLevel());
+ ((AggregationPlan) queryPlan)
+ .setGroupByLevelController(
+ queryOperator.getSelectOperator().getGroupByLevelController());
+ ((AggregationPlan) queryPlan).setCountStar(queryOperator.isCountStar());
+ ((AggregationPlan) queryPlan).setLevels(queryOperator.getLevels());
try {
if (!verifyAllAggregationDataTypesEqual(queryOperator)) {
throw new QueryProcessException("Aggregate among unmatched data types");
@@ -618,7 +622,7 @@ public class PhysicalGenerator {
} else if (queryPlan instanceof FillQueryPlan) {
alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan);
} else if (queryPlan instanceof AggregationPlan) {
- if (((AggregationPlan) queryPlan).getLevel() >= 0) {
+ if (((AggregationPlan) queryPlan).isGroupByLevel()) {
throw new QueryProcessException("group by level does not support align by device now.");
}
alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 4432eb5..cd1bb24 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
import org.apache.iotdb.db.qp.logical.crud.FromOperator;
import org.apache.iotdb.db.qp.logical.crud.FunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.GroupByLevelController;
import org.apache.iotdb.db.qp.logical.crud.InOperator;
import org.apache.iotdb.db.qp.logical.crud.LikeOperator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -92,6 +94,7 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
logger.warn(WARNING_NO_SUFFIX_PATHS);
return operator;
}
+ sfwOperator.checkCountStar();
}
checkAggrOfSelectOperator(select);
@@ -195,6 +198,16 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
List<UDFContext> originUdfList = selectOperator.getUdfList();
List<UDFContext> afterConcatUdfList = new ArrayList<>();
+ // TODO how to get the final result column of group by level
+
+ // 1. group by level, so need to get all measurements for each function one by one
+
+ // 2. group by level, and apply soffset and slimit
+
+ // 3. continue to step 1 or go to step 4 when slimit is arrived or function list is end
+
+ // 4. put all satisfied path to the operator and aggregation
+
for (int i = 0; i < suffixPaths.size(); i++) {
// selectPath cannot start with ROOT, which is guaranteed by TSParser
PartialPath selectPath = suffixPaths.get(i);
@@ -383,81 +396,114 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
List<String> newAggregations = new ArrayList<>();
List<UDFContext> newUdfList = new ArrayList<>();
- for (int i = 0; i < afterConcatPaths.size(); i++) {
- try {
- PartialPath afterConcatPath = afterConcatPaths.get(i);
-
- if (afterConcatPath == null) { // udf
- UDFContext originUdf = afterConcatUdfList.get(i);
- List<PartialPath> originPaths = originUdf.getPaths();
- List<List<PartialPath>> extendedPaths = new ArrayList<>();
-
- boolean atLeastOneSeriesNotExisted = false;
- for (PartialPath originPath : originPaths) {
- List<PartialPath> actualPaths = removeWildcard(originPath, 0, 0).left;
- if (actualPaths.isEmpty()) {
- atLeastOneSeriesNotExisted = true;
- break;
- }
- checkAndSetTsAlias(actualPaths, originPath);
- extendedPaths.add(actualPaths);
- }
- if (atLeastOneSeriesNotExisted) {
- continue;
- }
-
- List<List<PartialPath>> actualPaths = new ArrayList<>();
- cartesianProduct(extendedPaths, actualPaths, 0, new ArrayList<>());
+ if (selectOperator.isGroupByLevel()) {
+ GroupByLevelController groupByLevelController =
+ new GroupByLevelController(limit, offset, selectOperator.getLevels());
+ selectOperator.setGroupByLevelController(groupByLevelController);
+ // 1. group by level, so need to get all measurements for each function one by one
+ List<PartialPath> resultColumns = new LinkedList<>();
+ int preSize = 0;
+ for (int i = 0; i < afterConcatPaths.size(); i++) {
+ try {
+ PartialPath afterConcatPath = afterConcatPaths.get(i);
+
+ Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, 0, 0);
+ List<PartialPath> truePaths = pair.left;
+ checkAndSetTsAlias(truePaths, afterConcatPath);
+ resultColumns.addAll(truePaths);
+
+ groupByLevelController.control(
+ resultColumns, selectOperator.isCountStar(), afterConcatAggregations.get(i));
+
+ } catch (MetadataException e) {
+ throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+ }
- for (List<PartialPath> actualPath : actualPaths) {
- if (offset != 0) {
- --offset;
+ for (int j = preSize; j < resultColumns.size(); j++) {
+ extendListSafely(afterConcatAggregations, i, newAggregations);
+ newUdfList.add(null);
+ }
+ preSize = resultColumns.size();
+ }
+ consumed = resultColumns.size();
+ newSuffixPathList.addAll(resultColumns);
+ } else {
+ for (int i = 0; i < afterConcatPaths.size(); i++) {
+ try {
+ PartialPath afterConcatPath = afterConcatPaths.get(i);
+
+ if (afterConcatPath == null) { // udf
+ UDFContext originUdf = afterConcatUdfList.get(i);
+ List<PartialPath> originPaths = originUdf.getPaths();
+ List<List<PartialPath>> extendedPaths = new ArrayList<>();
+
+ boolean atLeastOneSeriesNotExisted = false;
+ for (PartialPath originPath : originPaths) {
+ List<PartialPath> actualPaths = removeWildcard(originPath, 0, 0).left;
+ if (actualPaths.isEmpty()) {
+ atLeastOneSeriesNotExisted = true;
+ break;
+ }
+ checkAndSetTsAlias(actualPaths, originPath);
+ extendedPaths.add(actualPaths);
+ }
+ if (atLeastOneSeriesNotExisted) {
continue;
- } else if (limit != 0) {
- --limit;
- } else {
- break;
}
- newSuffixPathList.add(null);
- extendListSafely(afterConcatAggregations, i, newAggregations);
+ List<List<PartialPath>> actualPaths = new ArrayList<>();
+ cartesianProduct(extendedPaths, actualPaths, 0, new ArrayList<>());
- newUdfList.add(
- new UDFContext(originUdf.getName(), originUdf.getAttributes(), actualPath));
- }
- } else { // non-udf
- Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, limit, offset);
- List<PartialPath> actualPaths = pair.left;
- checkAndSetTsAlias(actualPaths, afterConcatPath);
+ for (List<PartialPath> actualPath : actualPaths) {
+ if (offset != 0) {
+ --offset;
+ continue;
+ } else if (limit != 0) {
+ --limit;
+ } else {
+ break;
+ }
- for (PartialPath actualPath : actualPaths) {
- newSuffixPathList.add(actualPath);
- extendListSafely(afterConcatAggregations, i, newAggregations);
+ newSuffixPathList.add(null);
+ extendListSafely(afterConcatAggregations, i, newAggregations);
- newUdfList.add(null);
- }
+ newUdfList.add(
+ new UDFContext(originUdf.getName(), originUdf.getAttributes(), actualPath));
+ }
+ } else { // non-udf
+ Pair<List<PartialPath>, Integer> pair = removeWildcard(afterConcatPath, limit, offset);
+ List<PartialPath> actualPaths = pair.left;
+ checkAndSetTsAlias(actualPaths, afterConcatPath);
+
+ for (PartialPath actualPath : actualPaths) {
+ newSuffixPathList.add(actualPath);
+ extendListSafely(afterConcatAggregations, i, newAggregations);
- consumed += pair.right;
- if (offset != 0) {
- int delta = offset - pair.right;
- offset = Math.max(delta, 0);
- if (delta < 0) {
- limit += delta;
+ newUdfList.add(null);
+ }
+
+ consumed += pair.right;
+ if (offset != 0) {
+ int delta = offset - pair.right;
+ offset = Math.max(delta, 0);
+ if (delta < 0) {
+ limit += delta;
+ }
+ } else {
+ limit -= pair.right;
}
- } else {
- limit -= pair.right;
}
- }
- if (newSuffixPathList.size() > MAX_QUERY_PATH_NUM) {
- throw new PathNumOverLimitException();
- }
- if (limit == 0) {
- break;
- }
+ if (newSuffixPathList.size() > MAX_QUERY_PATH_NUM) {
+ throw new PathNumOverLimitException();
+ }
+ if (limit == 0) {
+ break;
+ }
- } catch (MetadataException e) {
- throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+ } catch (MetadataException e) {
+ throw new LogicalOptimizeException("error when remove star: " + e.getMessage());
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index d6f81b0..d9c6c69 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -57,7 +58,7 @@ public class GroupByTimeDataSet extends QueryDataSet {
this.context = context;
if (logger.isDebugEnabled()) {
- logger.debug("paths " + this.paths + " level:" + plan.getLevel());
+ logger.debug("paths " + this.paths + " level:" + Arrays.toString(plan.getLevels()));
}
Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 0371f09..cd8cb99 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -470,7 +470,7 @@ public class AggregationExecutor {
}
SingleDataSet dataSet;
- if (plan.getLevel() >= 0) {
+ if (plan.isGroupByLevel()) {
Map<String, AggregateResult> finalPaths = plan.getAggPathByLevel();
List<AggregateResult> mergedAggResults =
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 71b0219..500cfff 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -106,7 +107,7 @@ public class QueryRouter implements IQueryRouter {
"paths:"
+ aggregationPlan.getPaths()
+ " level:"
- + aggregationPlan.getLevel()
+ + Arrays.toString(aggregationPlan.getLevels())
+ " duplicatePaths:"
+ aggregationPlan.getDeduplicatedPaths()
+ " deduplicatePaths:"
@@ -149,7 +150,7 @@ public class QueryRouter implements IQueryRouter {
IOException {
if (logger.isDebugEnabled()) {
- logger.debug("paths:" + groupByTimePlan.getPaths() + " level:" + groupByTimePlan.getLevel());
+ logger.debug("paths:" + groupByTimePlan.getPaths() + " level:" + groupByTimePlan.getLevels());
}
GroupByEngineDataSet dataSet = null;
@@ -177,7 +178,7 @@ public class QueryRouter implements IQueryRouter {
// we support group by level for count operation
// details at https://issues.apache.org/jira/browse/IOTDB-622
// and UserGuide/Operation Manual/DML
- if (groupByTimePlan.getLevel() >= 0) {
+ if (groupByTimePlan.isGroupByLevel()) {
return groupByLevelWithoutTimeIntervalDataSet(context, groupByTimePlan, dataSet);
}
return dataSet;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 37127c9..feefe1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -855,7 +855,7 @@ public class TSServiceImpl implements TSIService.Iface {
// because the query dataset and query id is different although the header of last query is
// same.
return StaticResps.LAST_RESP.deepCopy();
- } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).getLevel() >= 0) {
+ } else if (plan instanceof AggregationPlan && ((AggregationPlan) plan).isGroupByLevel()) {
Map<String, AggregateResult> finalPaths = ((AggregationPlan) plan).getAggPathByLevel();
for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
respColumns.add(entry.getKey());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
index 2fbf8ef..6f7eb6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/AggregateUtils.java
@@ -36,35 +36,44 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class AggregateUtils {
/**
- * Transform an originalPath to a partial path that satisfies given level. Path nodes exceed the
- * given level will be replaced by "*", e.g. generatePartialPathByLevel("root.sg.dh.d1.s1", 2)
- * will return "root.sg.dh.*.s1"
+ * Transform an originalPath to a partial path that satisfies given level. Path nodes don't
+ * satisfy the given level will be replaced by "*" except the sensor level, e.g.
+ * generatePartialPathByLevel("root.sg.dh.d1.s1", 2) will return "root.*.dh.*.s1".
+ *
+ * <p>Especially, if count(*), then the sensor level will be replaced by "*" too.
*
- * @param originalPath the original timeseries path
- * @param pathLevel the expected path level
* @return result partial path
*/
- public static String generatePartialPathByLevel(String originalPath, int pathLevel)
- throws IllegalPathException {
- String[] tmpPath = MetaUtils.splitPathToDetachedPath(originalPath);
- if (tmpPath.length <= pathLevel) {
- return originalPath;
+ public static String generatePartialPathByLevel(
+ String originalPath, int[] pathLevels, boolean isCountStar) throws IllegalPathException {
+ String[] nodes = MetaUtils.splitPathToDetachedPath(originalPath);
+ Set<Integer> levelSet = new HashSet<>();
+ for (int level : pathLevels) {
+ levelSet.add(level);
}
+
StringBuilder transformedPath = new StringBuilder();
- transformedPath.append(tmpPath[0]);
- for (int k = 1; k < tmpPath.length - 1; k++) {
- if (k <= pathLevel) {
- transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[k]);
+ transformedPath.append(nodes[0]).append(TsFileConstant.PATH_SEPARATOR);
+ for (int k = 1; k < nodes.length - 1; k++) {
+ if (levelSet.contains(k)) {
+ transformedPath.append(nodes[k]);
} else {
- transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(IoTDBConstant.PATH_WILDCARD);
+ transformedPath.append(IoTDBConstant.PATH_WILDCARD);
}
+ transformedPath.append(TsFileConstant.PATH_SEPARATOR);
+ }
+ if (isCountStar) {
+ transformedPath.append(IoTDBConstant.PATH_WILDCARD);
+ } else {
+ transformedPath.append(nodes[nodes.length - 1]);
}
- transformedPath.append(TsFileConstant.PATH_SEPARATOR).append(tmpPath[tmpPath.length - 1]);
return transformedPath.toString();
}
@@ -140,23 +149,23 @@ public class AggregateUtils {
List<AggregateResult> resultSet = new ArrayList<>();
List<PartialPath> dupPaths = plan.getDeduplicatedPaths();
- try {
- for (int i = 0; i < aggResults.size(); i++) {
- if (aggResults.get(i) != null) {
- String transformedPath =
- generatePartialPathByLevel(dupPaths.get(i).getFullPath(), plan.getLevel());
- String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath + ")";
- AggregateResult tempAggResult = finalPaths.get(key);
- if (tempAggResult == null) {
- finalPaths.put(key, aggResults.get(i));
- } else {
- tempAggResult.merge(aggResults.get(i));
- finalPaths.put(key, tempAggResult);
- }
+ for (int i = 0; i < aggResults.size(); i++) {
+ if (aggResults.get(i) != null) {
+ // String transformedPath =
+ // generatePartialPathByLevel(
+ // dupPaths.get(i).getFullPath(), plan.getLevels(), plan.isCountStar());
+ // String key = plan.getDeduplicatedAggregations().get(i) + "(" + transformedPath +
+ // ")";
+ String rawPath = plan.getDeduplicatedAggregations().get(i) + "(" + dupPaths.get(i).getFullPath() + ")";
+ String key = plan.getGroupByLevelController().getGroupedPath(rawPath);
+ AggregateResult tempAggResult = finalPaths.get(key);
+ if (tempAggResult == null) {
+ finalPaths.put(key, aggResults.get(i));
+ } else {
+ tempAggResult.merge(aggResults.get(i));
+ finalPaths.put(key, tempAggResult);
}
}
- } catch (IllegalPathException e) {
- throw new QueryProcessException(e.getMessage());
}
for (Map.Entry<String, AggregateResult> entry : finalPaths.entrySet()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
index 602ee20..4518a02 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByLevelDataSetTest.java
@@ -147,5 +147,15 @@ public class GroupByLevelDataSetTest {
assertTrue(dataSet.hasNext());
assertEquals("0\t1", dataSet.next().toString());
+
+ // multi level
+ queryPlan =
+ (QueryPlan)
+ processor.parseSQLToPhysicalPlan(
+ "select count(s1) from root.test.*,root.vehicle.* group by level=1,2");
+ dataSet = queryExecutor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+
+ assertTrue(dataSet.hasNext());
+ assertEquals("0\t12\t10", dataSet.next().toString());
}
}