You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/03/09 01:26:46 UTC
[iotdb] 01/01: [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)
This is an automated email from the ASF dual-hosted git repository.
ericpai pushed a commit to branch bugfix/iotdb-5619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d72bb3222c3b88cb761599246bb4250ce3258478
Author: BaiJian <er...@hotmail.com>
AuthorDate: Wed Mar 8 13:56:04 2023 +0800
[IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)
---
.../db/it/aggregation/IoTDBTagAggregationIT.java | 55 +++++++++++++++++++++-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 1 +
.../db/mpp/plan/planner/SubPlanTypeExtractor.java | 16 ++++---
.../planner/plan/node/process/GroupByTagNode.java | 27 +++++++++++
.../plan/node/process/GroupByTagNodeSerdeTest.java | 2 +-
5 files changed, 92 insertions(+), 9 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
index 9ca72958e5..acd72723a0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
@@ -45,11 +45,13 @@ import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
public class IoTDBTagAggregationIT {
+ private final double E = 0.00001D;
protected static final String[] DATASET =
new String[] {
"CREATE DATABASE root.sg.a;",
"CREATE DATABASE root.sg.b;",
"CREATE DATABASE root.sg2.c;",
+ "CREATE DATABASE root.case2;",
"create timeseries root.sg.a.d1.t with datatype=FLOAT tags(k1=k1v1, k2=k2v1, k3=k3v1);",
"create timeseries root.sg.b.d2.t with datatype=FLOAT tags(k1=k1v1, k2=k2v2);",
"create timeseries root.sg.a.d3.t with datatype=FLOAT tags(k1=k1v2, k2=k2v1);",
@@ -72,7 +74,17 @@ public class IoTDBTagAggregationIT {
"insert into root.sg.b.d4(time, t) values(10, 5.4);",
"insert into root.sg.a.d5(time, t) values(10, 6.5);",
"insert into root.sg.b.d6(time, t) values(10, 7.6);",
- "insert into root.sg.a.d7(time, t) values(10, 8.7);"
+ "insert into root.sg.a.d7(time, t) values(10, 8.7);",
+
+ // test multi value with multi aggregation column
+ "create timeseries root.case2.d1.s1 with datatype=FLOAT tags(k1=v1);",
+ "create timeseries root.case2.d2.s1 with datatype=FLOAT tags(k1=v1);",
+ "create timeseries root.case2.d1.s2 with datatype=FLOAT tags(k1=v2);",
+ "create timeseries root.case2.d3.s1 with datatype=FLOAT tags(k1=v2);",
+ "insert into root.case2.d1(time, s1) values(10, 8.8);",
+ "insert into root.case2.d2(time, s1) values(10, 7.7);",
+ "insert into root.case2.d1(time, s2) values(10, 6.6);",
+ "insert into root.case2.d3(time, s1) values(10, 9.9);",
};
protected static final double DELTA = 0.001D;
@@ -527,4 +539,45 @@ public class IoTDBTagAggregationIT {
e.getMessage().contains("Having clause is not supported yet in GROUP BY TAGS query"));
}
}
+
+ @Test
+ public void testWithEmptyGroupedTimeSeries() {
+ String query = "SELECT avg(s1), avg(s2) FROM root.case2.** GROUP BY TAGS(k1)";
+ // Expected result set:
+ // +--+-----------------+-----------------+
+ // |k1| avg(s1)| avg(s2)|
+ // +--+-----------------+-----------------+
+ // |v1| 8.25| null|
+ // |v2|9.899999618530273|6.599999904632568|
+ // +--+-----------------+-----------------+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery(query)) {
+ Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+ Set<String> groups = new HashSet<>();
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(resultSet.next());
+ String tagValue = resultSet.getString("k1");
+ switch (tagValue) {
+ case "v1":
+ Assert.assertEquals(8.25D, resultSet.getDouble("avg(s1)"), E);
+ Assert.assertEquals(0.0D, resultSet.getDouble("avg(s2)"), E);
+ break;
+ case "v2":
+ Assert.assertEquals(9.899999618530273D, resultSet.getDouble("avg(s1)"), E);
+ Assert.assertEquals(6.599999904632568D, resultSet.getDouble("avg(s2)"), E);
+ break;
+ default:
+ fail("Unexpected tag value: " + tagValue);
+ }
+ groups.add(tagValue);
+ }
+ Assert.assertEquals(2, groups.size());
+ Assert.assertFalse(resultSet.next());
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 06031866fd..1c6377feb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1314,6 +1314,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
List<AggregationDescriptor> aggregationDescriptors =
node.getTagValuesToAggregationDescriptors().values().stream()
.flatMap(Collection::stream)
+ .filter(Objects::nonNull)
.collect(Collectors.toList());
long maxReturnSize =
calculateMaxAggregationResultSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index af88b10cf7..e80f721677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -36,7 +36,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
-import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
public class SubPlanTypeExtractor {
@@ -86,19 +87,19 @@ public class SubPlanTypeExtractor {
@Override
public Void visitAggregation(AggregationNode node, Void context) {
- updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+ updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
return visitPlan(node, context);
}
@Override
public Void visitSlidingWindowAggregation(SlidingWindowAggregationNode node, Void context) {
- updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+ updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
return visitPlan(node, context);
}
@Override
public Void visitGroupByLevel(GroupByLevelNode node, Void context) {
- updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors());
+ updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors().stream());
return visitPlan(node, context);
}
@@ -106,7 +107,8 @@ public class SubPlanTypeExtractor {
public Void visitGroupByTag(GroupByTagNode node, Void context) {
node.getTagValuesToAggregationDescriptors()
.values()
- .forEach(this::updateTypeProviderByAggregationDescriptor);
+ .forEach(
+ v -> updateTypeProviderByAggregationDescriptor(v.stream().filter(Objects::nonNull)));
return visitPlan(node, context);
}
@@ -140,8 +142,8 @@ public class SubPlanTypeExtractor {
// end region PlanNode of last query
private void updateTypeProviderByAggregationDescriptor(
- List<? extends AggregationDescriptor> aggregationDescriptorList) {
- aggregationDescriptorList.stream()
+ Stream<? extends AggregationDescriptor> aggregationDescriptorList) {
+ aggregationDescriptorList
.flatMap(aggregationDescriptor -> aggregationDescriptor.getInputExpressions().stream())
.forEach(
expression -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 2089696218..bb77acd56d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -209,6 +209,31 @@ public class GroupByTagNode extends MultiChildProcessNode {
return tagKeys;
}
+ /**
+ * The CrossSeriesAggregationDescriptor may be null if there exists a key containing no
+ * timeSeries.
+ *
+ * <p>e.g. we have following timeSeries:
+ *
+ * <ul>
+ * <li>root.sg.d1.s1(k1=v1)
+ * <li>root.sg.d1.s2(k1=v1)
+ * <li>root.sg.d2.s1(k1=v2)
+ * <li>root.sg.d3.s1(k1=v2)
+ * </ul>
+ *
+ * Then the query <code>
+ * SELECT avg(s1), avg(s2) FROM root.sg.** GROUP BY TAGS(k1)
+ * </code>will generate a {@link GroupByTagNode} with the <code>TagValuesToAggregationDescriptors
+ * </code> as below: <code>
+ * {
+ * ["v1"]: [["avg(root.sg.d1.s1)"], ["avg(root.sg.d1.s2)"]],
+ * ["v2"]: [["avg(root.sg.d2.s1)","avg(root.sg.d3.s1)"], null],
+ * }
+ * </code>
+ *
+ * <p>So we should use it carefully with null values.
+ */
public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
getTagValuesToAggregationDescriptors() {
return tagValuesToAggregationDescriptors;
@@ -230,6 +255,8 @@ public class GroupByTagNode extends MultiChildProcessNode {
byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
if (isNotNull == 1) {
aggregationDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
+ } else {
+ aggregationDescriptors.add(null);
}
numOfAggregationDescriptors -= 1;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
index 0f35462af4..b0c0c139e5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
@@ -92,7 +92,7 @@ public class GroupByTagNodeSerdeTest {
Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
new HashMap<>();
tagValuesToAggregationDescriptors.put(
- Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, s1Avg));
+ Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, null, s1Avg));
GroupByTagNode expectedNode =
new GroupByTagNode(
new PlanNodeId("TestGroupByTagNode"),