You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2023/03/08 05:56:10 UTC

[iotdb] branch master updated: [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4fb11ec6 [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)
dc4fb11ec6 is described below

commit dc4fb11ec6a51a4b296ef6d2a4dea8b54d3bb62d
Author: BaiJian <er...@hotmail.com>
AuthorDate: Wed Mar 8 13:56:04 2023 +0800

    [IOTDB-5619] Fix NPE in processing GroupByTagNode (#9235)
---
 .../db/it/aggregation/IoTDBTagAggregationIT.java   | 55 +++++++++++++++++++++-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  1 +
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  | 16 ++++---
 .../planner/plan/node/process/GroupByTagNode.java  | 27 +++++++++++
 .../plan/node/process/GroupByTagNodeSerdeTest.java |  2 +-
 5 files changed, 92 insertions(+), 9 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
index 9ca72958e5..acd72723a0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBTagAggregationIT.java
@@ -45,11 +45,13 @@ import static org.junit.Assert.fail;
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
 public class IoTDBTagAggregationIT {
+  private final double E = 0.00001D;
   protected static final String[] DATASET =
       new String[] {
         "CREATE DATABASE root.sg.a;",
         "CREATE DATABASE root.sg.b;",
         "CREATE DATABASE root.sg2.c;",
+        "CREATE DATABASE root.case2;",
         "create timeseries root.sg.a.d1.t with datatype=FLOAT tags(k1=k1v1, k2=k2v1, k3=k3v1);",
         "create timeseries root.sg.b.d2.t with datatype=FLOAT tags(k1=k1v1, k2=k2v2);",
         "create timeseries root.sg.a.d3.t with datatype=FLOAT tags(k1=k1v2, k2=k2v1);",
@@ -72,7 +74,17 @@ public class IoTDBTagAggregationIT {
         "insert into root.sg.b.d4(time, t) values(10, 5.4);",
         "insert into root.sg.a.d5(time, t) values(10, 6.5);",
         "insert into root.sg.b.d6(time, t) values(10, 7.6);",
-        "insert into root.sg.a.d7(time, t) values(10, 8.7);"
+        "insert into root.sg.a.d7(time, t) values(10, 8.7);",
+
+        // test multi value with multi aggregation column
+        "create timeseries root.case2.d1.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d2.s1 with datatype=FLOAT tags(k1=v1);",
+        "create timeseries root.case2.d1.s2 with datatype=FLOAT tags(k1=v2);",
+        "create timeseries root.case2.d3.s1 with datatype=FLOAT tags(k1=v2);",
+        "insert into root.case2.d1(time, s1) values(10, 8.8);",
+        "insert into root.case2.d2(time, s1) values(10, 7.7);",
+        "insert into root.case2.d1(time, s2) values(10, 6.6);",
+        "insert into root.case2.d3(time, s1) values(10, 9.9);",
       };
 
   protected static final double DELTA = 0.001D;
@@ -527,4 +539,45 @@ public class IoTDBTagAggregationIT {
           e.getMessage().contains("Having clause is not supported yet in GROUP BY TAGS query"));
     }
   }
+
+  @Test
+  public void testWithEmptyGroupedTimeSeries() {
+    String query = "SELECT avg(s1), avg(s2) FROM root.case2.** GROUP BY TAGS(k1)";
+    // Expected result set:
+    // +--+-----------------+-----------------+
+    // |k1|          avg(s1)|          avg(s2)|
+    // +--+-----------------+-----------------+
+    // |v1|             8.25|             null|
+    // |v2|9.899999618530273|6.599999904632568|
+    // +--+-----------------+-----------------+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try (ResultSet resultSet = statement.executeQuery(query)) {
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
+        Set<String> groups = new HashSet<>();
+        for (int i = 0; i < 2; i++) {
+          Assert.assertTrue(resultSet.next());
+          String tagValue = resultSet.getString("k1");
+          switch (tagValue) {
+            case "v1":
+              Assert.assertEquals(8.25D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(0.0D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            case "v2":
+              Assert.assertEquals(9.899999618530273D, resultSet.getDouble("avg(s1)"), E);
+              Assert.assertEquals(6.599999904632568D, resultSet.getDouble("avg(s2)"), E);
+              break;
+            default:
+              fail("Unexpected tag value: " + tagValue);
+          }
+          groups.add(tagValue);
+        }
+        Assert.assertEquals(2, groups.size());
+        Assert.assertFalse(resultSet.next());
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1f7ac0236c..5ea018928a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1314,6 +1314,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<AggregationDescriptor> aggregationDescriptors =
         node.getTagValuesToAggregationDescriptors().values().stream()
             .flatMap(Collection::stream)
+            .filter(Objects::nonNull)
             .collect(Collectors.toList());
     long maxReturnSize =
         calculateMaxAggregationResultSize(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
index af88b10cf7..e80f721677 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -36,7 +36,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 
-import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
 
 public class SubPlanTypeExtractor {
 
@@ -86,19 +87,19 @@ public class SubPlanTypeExtractor {
 
     @Override
     public Void visitAggregation(AggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitSlidingWindowAggregation(SlidingWindowAggregationNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList());
+      updateTypeProviderByAggregationDescriptor(node.getAggregationDescriptorList().stream());
       return visitPlan(node, context);
     }
 
     @Override
     public Void visitGroupByLevel(GroupByLevelNode node, Void context) {
-      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors());
+      updateTypeProviderByAggregationDescriptor(node.getGroupByLevelDescriptors().stream());
       return visitPlan(node, context);
     }
 
@@ -106,7 +107,8 @@ public class SubPlanTypeExtractor {
     public Void visitGroupByTag(GroupByTagNode node, Void context) {
       node.getTagValuesToAggregationDescriptors()
           .values()
-          .forEach(this::updateTypeProviderByAggregationDescriptor);
+          .forEach(
+              v -> updateTypeProviderByAggregationDescriptor(v.stream().filter(Objects::nonNull)));
       return visitPlan(node, context);
     }
 
@@ -140,8 +142,8 @@ public class SubPlanTypeExtractor {
     // end region PlanNode of last query
 
     private void updateTypeProviderByAggregationDescriptor(
-        List<? extends AggregationDescriptor> aggregationDescriptorList) {
-      aggregationDescriptorList.stream()
+        Stream<? extends AggregationDescriptor> aggregationDescriptorList) {
+      aggregationDescriptorList
           .flatMap(aggregationDescriptor -> aggregationDescriptor.getInputExpressions().stream())
           .forEach(
               expression -> {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
index 2089696218..bb77acd56d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java
@@ -209,6 +209,31 @@ public class GroupByTagNode extends MultiChildProcessNode {
     return tagKeys;
   }
 
+  /**
+   * The CrossSeriesAggregationDescriptor may be null if there exists a key containing no
+   * timeSeries.
+   *
+   * <p>e.g. we have following timeSeries:
+   *
+   * <ul>
+   *   <li>root.sg.d1.s1(k1=v1)
+   *   <li>root.sg.d1.s2(k1=v1)
+   *   <li>root.sg.d2.s1(k1=v2)
+   *   <li>root.sg.d3.s1(k1=v2)
+   * </ul>
+   *
+   * Then the query <code>
+   * SELECT avg(s1), avg(s2) FROM root.sg.** GROUP BY TAGS(k1)
+   * </code>will generate a {@link GroupByTagNode} with the <code>TagValuesToAggregationDescriptors
+   * </code> as below: <code>
+   *   {
+   *     ["v1"]: [["avg(root.sg.d1.s1)"], ["avg(root.sg.d1.s2)"]],
+   *     ["v2"]: [["avg(root.sg.d2.s1)","avg(root.sg.d3.s1)"], null],
+   *   }
+   * </code>
+   *
+   * <p>So we should use it carefully with null values.
+   */
   public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
       getTagValuesToAggregationDescriptors() {
     return tagValuesToAggregationDescriptors;
@@ -230,6 +255,8 @@ public class GroupByTagNode extends MultiChildProcessNode {
         byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
         if (isNotNull == 1) {
           aggregationDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
+        } else {
+          aggregationDescriptors.add(null);
         }
         numOfAggregationDescriptors -= 1;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
index 0f35462af4..b0c0c139e5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByTagNodeSerdeTest.java
@@ -92,7 +92,7 @@ public class GroupByTagNodeSerdeTest {
     Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
         new HashMap<>();
     tagValuesToAggregationDescriptors.put(
-        Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, s1Avg));
+        Arrays.asList("v1", "v2"), Arrays.asList(s1MaxTime, null, s1Avg));
     GroupByTagNode expectedNode =
         new GroupByTagNode(
             new PlanNodeId("TestGroupByTagNode"),