You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/03/07 10:20:25 UTC

[iotdb] branch bugfix/iotdb-5619 created (now e041a4d13a)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch bugfix/iotdb-5619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e041a4d13a [IOTDB-5619] Fix NPE in processing GroupByTagNode

This branch includes the following new commits:

     new e041a4d13a [IOTDB-5619] Fix NPE in processing GroupByTagNode

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5619] Fix NPE in processing GroupByTagNode

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-5619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e041a4d13ade6257b28d100faedc2899448112fa
Author: EricPai <er...@hotmail.com>
AuthorDate: Tue Mar 7 18:20:08 2023 +0800

    [IOTDB-5619] Fix NPE in processing GroupByTagNode
---
 .../db/it/aggregation/IoTDBTagAggregationIT.java   | 55 +++++++++++++++++++++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  1 +
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  | 16 ++++---
 .../planner/plan/node/process/GroupByTagNode.java  | 25 ++++++++++
 4 files changed, 89 insertions(+), 8 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
index 9ca72958e5..acd72723a0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
@@ -45,11 +45,13 @@ import static org.junit.Assert.fail;
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBTagAggregationIT {
+  private final double E = 0.00001D;
   protected static final String[] DATASET =
       new String[] {
         "CREATE DATABASE root.sg.a;",
         "CREATE DATABASE root.sg.b;",
         "CREATE DATABASE root.sg2.c;",
+        "CREATE DATABASE root.case2;",
         "create timeseries root.sg.a.d1.t with datatype=FLOAT tags(k1=k1v1, k2=k2v1, k3=k3v1);",
         "create timeseries root.sg.b.d2.t with datatype=FLOAT tags(k1=k1v1, k2=k2v2);",
         "create timeseries root.sg.a.d3.t with datatype=FLOAT tags(k1=k1v2, k2=k2v1);",
@@ -72,7 +74,17 @@ public class IoTDBTagAggregationIT {
         "insert into root.sg.b.d4(time, t) values(10, 5.4);",
         "insert into root.sg.a.d5(time, t) values(10, 6.5);",
         "insert into root.sg.b.d6(time, t) values(10, 7.6);",
-        "insert into root.sg.a.d7(time, t) values(10, 8.7);"
+        "insert into root.sg.a.d7(time, t) values(10, 8.7);",
+
+        // test multi value with multi aggregation column
+        "create timeseries root.case2.d1.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d2.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d1.s2 with datatype=FLOAT tags(k1=v2);",
+        "create timeseries root.case2.d3.s1 with datatype=FLOAT tags(k1=v2);",
+        "insert into root.case2.d1(time, s1) values(10, 8.8);",
+        "insert into root.case2.d2(time, s1) values(10, 7.7);",
+        "insert into root.case2.d1(time, s2) values(10, 6.6);",
+        "insert into root.case2.d3(time, s1) values(10, 9.9);",
       };
 
   protected static final double DELTA = 0.001D;
@@ -527,4 +539,45 @@ public class IoTDBTagAggregationIT {
           e.getMessage().contains("Having clause is not supported yet in GROUP BY TAGS query"));
     }
   }
+
+  @Test
+  public void testWithEmptyGroupedTimeSeries() {
+    String query = "SELECT avg(s1), avg(s2) FROM root.case2.** GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +--+-----------------+-----------------+
+    // |k1|          avg(s1)|          avg(s2)|
+    // +--+-----------------+-----------------+
+    // |v1|             8.25|             null|
+    // |v2|9.899999618530273|6.599999904632568|
+    // +--+-----------------+-----------------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 2; i++) {
+          Assert.assertTrue(resultSet.next());
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "v1":
+              Assert.assertEquals(8.25D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(0.0D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            case "v2":
+              Assert.assertEquals(9.899999618530273D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(6.599999904632568D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(2, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1f7ac0236c..5ea018928a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1314,6 +1314,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<AggregationDescriptor> aggregationDescriptors =
         node.getTagValuesToAggregationDescriptors().values().stream()
             .flatMap(Collection::stream)
+            .filter(Objects::nonNull)
             .collect(Collectors.toList());
     long maxReturnSize =
         calculateMaxAggregationResultSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index af88b10cf7..e80f721677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -36,7 +36,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
-import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
 
 public class SubPlanTypeExtractor {
 
@@ -86,19 +87,19 @@ public class SubPlanTypeExtractor {
 
     @Override
     public Void visitAggregation(AggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitSlidingWindowAggregation(SlidingWindowAggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitGroupByLevel(GroupByLevelNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors());
+      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors().stream());
       return visitPlan(node, context);
     }
 
@@ -106,7 +107,8 @@ public class SubPlanTypeExtractor {
     public Void visitGroupByTag(GroupByTagNode node, Void context) {
       node.getTagValuesToAggregationDescriptors()
           .values()
-          .forEach(this::updateTypeProviderByAggregationDescriptor);
+          .forEach(
+              v -> updateTypeProviderByAggregationDescriptor(v.stream().filter(Objects::nonNull)));
       return visitPlan(node, context);
     }
 
@@ -140,8 +142,8 @@ public class SubPlanTypeExtractor {
     // end region PlanNode of last query
 
     private void updateTypeProviderByAggregationDescriptor(
-        List<? extends AggregationDescriptor> aggregationDescriptorList) {
-      aggregationDescriptorList.stream()
+        Stream<? extends AggregationDescriptor> aggregationDescriptorList) {
+      aggregationDescriptorList
           .flatMap(aggregationDescriptor -> aggregationDescriptor.getInputExpressions().stream())
           .forEach(
               expression -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 2089696218..901fadc094 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -209,6 +209,31 @@ public class GroupByTagNode extends MultiChildProcessNode {
     return tagKeys;
   }
 
+  /**
+   * The CrossSeriesAggregationDescriptor may be null if there exists a key containing no
+   * timeSeries.
+   *
+   * <p>e.g. we have following timeSeries:
+   *
+   * <ul>
+   *   <li>root.sg.d1.s1(k1=v1)
+   *   <li>root.sg.d1.s2(k1=v1)
+   *   <li>root.sg.d2.s1(k1=v2)
+   *   <li>root.sg.d3.s1(k1=v2)
+   * </ul>
+   *
+   * Then the query <code>
+   * SELECT avg(s1), avg(s2) FROM root.sg.** GROUP BY TAGS(k1)
+   * </code>will generate a {@link GroupByTagNode} with the <code>TagValuesToAggregationDescriptors
+   * </code> as below: <code>
+   *   {
+   *     ["v1"]: [["avg(root.sg.d1.s1)"], ["avg(root.sg.d1.s2)"]],
+   *     ["v2"]: [["avg(root.sg.d2.s1)","avg(root.sg.d3.s1)"], null],
+   *   }
+   * </code>
+   *
+   * <p>So we should use it carefully with null values.
+   */
   public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
       getTagValuesToAggregationDescriptors() {
     return tagValuesToAggregationDescriptors;