You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/03/09 01:26:45 UTC

[iotdb] branch bugfix/iotdb-5619 created (now d72bb3222c)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch bugfix/iotdb-5619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at d72bb3222c [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)

This branch includes the following new commits:

     new d72bb3222c [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch bugfix/iotdb-5619
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d72bb3222c3b88cb761599246bb4250ce3258478
Author: BaiJian <er...@hotmail.com>
AuthorDate: Wed Mar 8 13:56:04 2023 +0800

    [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)
---
 .../db/it/aggregation/IoTDBTagAggregationIT.java   | 55 +++++++++++++++++++++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  1 +
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  | 16 ++++---
 .../planner/plan/node/process/GroupByTagNode.java  | 27 +++++++++++
 .../plan/node/process/GroupByTagNodeSerdeTest.java |  2 +-
 5 files changed, 92 insertions(+), 9 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
index 9ca72958e5..acd72723a0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
@@ -45,11 +45,13 @@ import static org.junit.Assert.fail;
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBTagAggregationIT {
+  private final double E = 0.00001D;
   protected static final String[] DATASET =
       new String[] {
         "CREATE DATABASE root.sg.a;",
         "CREATE DATABASE root.sg.b;",
         "CREATE DATABASE root.sg2.c;",
+        "CREATE DATABASE root.case2;",
         "create timeseries root.sg.a.d1.t with datatype=FLOAT tags(k1=k1v1, k2=k2v1, k3=k3v1);",
         "create timeseries root.sg.b.d2.t with datatype=FLOAT tags(k1=k1v1, k2=k2v2);",
         "create timeseries root.sg.a.d3.t with datatype=FLOAT tags(k1=k1v2, k2=k2v1);",
@@ -72,7 +74,17 @@ public class IoTDBTagAggregationIT {
         "insert into root.sg.b.d4(time, t) values(10, 5.4);",
         "insert into root.sg.a.d5(time, t) values(10, 6.5);",
         "insert into root.sg.b.d6(time, t) values(10, 7.6);",
-        "insert into root.sg.a.d7(time, t) values(10, 8.7);"
+        "insert into root.sg.a.d7(time, t) values(10, 8.7);",
+
+        // test multi value with multi aggregation column
+        "create timeseries root.case2.d1.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d2.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d1.s2 with datatype=FLOAT tags(k1=v2);",
+        "create timeseries root.case2.d3.s1 with datatype=FLOAT tags(k1=v2);",
+        "insert into root.case2.d1(time, s1) values(10, 8.8);",
+        "insert into root.case2.d2(time, s1) values(10, 7.7);",
+        "insert into root.case2.d1(time, s2) values(10, 6.6);",
+        "insert into root.case2.d3(time, s1) values(10, 9.9);",
       };
 
   protected static final double DELTA = 0.001D;
@@ -527,4 +539,45 @@ public class IoTDBTagAggregationIT {
           e.getMessage().contains("Having clause is not supported yet in GROUP BY TAGS query"));
     }
   }
+
+  @Test
+  public void testWithEmptyGroupedTimeSeries() {
+    String query = "SELECT avg(s1), avg(s2) FROM root.case2.** GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +--+-----------------+-----------------+
+    // |k1|          avg(s1)|          avg(s2)|
+    // +--+-----------------+-----------------+
+    // |v1|             8.25|             null|
+    // |v2|9.899999618530273|6.599999904632568|
+    // +--+-----------------+-----------------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 2; i++) {
+          Assert.assertTrue(resultSet.next());
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "v1":
+              Assert.assertEquals(8.25D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(0.0D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            case "v2":
+              Assert.assertEquals(9.899999618530273D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(6.599999904632568D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(2, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 06031866fd..1c6377feb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1314,6 +1314,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<AggregationDescriptor> aggregationDescriptors =
         node.getTagValuesToAggregationDescriptors().values().stream()
             .flatMap(Collection::stream)
+            .filter(Objects::nonNull)
             .collect(Collectors.toList());
     long maxReturnSize =
         calculateMaxAggregationResultSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index af88b10cf7..e80f721677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -36,7 +36,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
-import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
 
 public class SubPlanTypeExtractor {
 
@@ -86,19 +87,19 @@ public class SubPlanTypeExtractor {
 
     @Override
     public Void visitAggregation(AggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitSlidingWindowAggregation(SlidingWindowAggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitGroupByLevel(GroupByLevelNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors());
+      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors().stream());
       return visitPlan(node, context);
     }
 
@@ -106,7 +107,8 @@ public class SubPlanTypeExtractor {
     public Void visitGroupByTag(GroupByTagNode node, Void context) {
       node.getTagValuesToAggregationDescriptors()
           .values()
-          .forEach(this::updateTypeProviderByAggregationDescriptor);
+          .forEach(
+              v -> updateTypeProviderByAggregationDescriptor(v.stream().filter(Objects::nonNull)));
       return visitPlan(node, context);
     }
 
@@ -140,8 +142,8 @@ public class SubPlanTypeExtractor {
     // end region PlanNode of last query
 
     private void updateTypeProviderByAggregationDescriptor(
-        List<? extends AggregationDescriptor> aggregationDescriptorList) {
-      aggregationDescriptorList.stream()
+        Stream<? extends AggregationDescriptor> aggregationDescriptorList) {
+      aggregationDescriptorList
           .flatMap(aggregationDescriptor -> aggregationDescriptor.getInputExpressions().stream())
           .forEach(
               expression -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 2089696218..bb77acd56d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -209,6 +209,31 @@ public class GroupByTagNode extends MultiChildProcessNode {
     return tagKeys;
   }
 
+  /**
+   * The CrossSeriesAggregationDescriptor may be null if there exists a key containing no
+   * timeSeries.
+   *
+   * <p>e.g. we have following timeSeries:
+   *
+   * <ul>
+   *   <li>root.sg.d1.s1(k1=v1)
+   *   <li>root.sg.d1.s2(k1=v1)
+   *   <li>root.sg.d2.s1(k1=v2)
+   *   <li>root.sg.d3.s1(k1=v2)
+   * </ul>
+   *
+   * Then the query <code>
+   * SELECT avg(s1), avg(s2) FROM root.sg.** GROUP BY TAGS(k1)
+   * </code>will generate a {@link GroupByTagNode} with the <code>TagValuesToAggregationDescriptors
+   * </code> as below: <code>
+   *   {
+   *     ["v1"]: [["avg(root.sg.d1.s1)"], ["avg(root.sg.d1.s2)"]],
+   *     ["v2"]: [["avg(root.sg.d2.s1)","avg(root.sg.d3.s1)"], null],
+   *   }
+   * </code>
+   *
+   * <p>So we should use it carefully with null values.
+   */
   public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
       getTagValuesToAggregationDescriptors() {
     return tagValuesToAggregationDescriptors;
@@ -230,6 +255,8 @@ public class GroupByTagNode extends MultiChildProcessNode {
         byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
         if (isNotNull == 1) {
           aggregationDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
+        } else {
+          aggregationDescriptors.add(null);
         }
         numOfAggregationDescriptors -= 1;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
index 0f35462af4..b0c0c139e5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
@@ -92,7 +92,7 @@ public class GroupByTagNodeSerdeTest {
     Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
         new HashMap<>();
     tagValuesToAggregationDescriptors.put(
-        Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, s1Avg));
+        Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, null, s1Avg));
     GroupByTagNode expectedNode =
         new GroupByTagNode(
             new PlanNodeId("TestGroupByTagNode"),