You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/09 12:43:21 UTC

[iotdb] branch aggregationOp created (now a34d51ee28)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a34d51ee28 add some tests

This branch includes the following new commits:

     new 1938ed5558 add aggregate Operator
     new 033b38ca0b add RawDataAggregateOperator
     new a34d51ee28 add some tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/03: add some tests

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a34d51ee2839cd8c579ce6353b4133ae8410a838
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 9 20:42:45 2022 +0800

    add some tests
---
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   |   7 +
 .../iotdb/db/mpp/aggregation/CountAccumulator.java |   4 +
 .../db/mpp/aggregation/ExtremeAccumulator.java     |   3 +
 .../db/mpp/aggregation/FirstValueAccumulator.java  |   3 +
 .../db/mpp/aggregation/LastValueAccumulator.java   |   3 +
 .../db/mpp/aggregation/MaxTimeAccumulator.java     |   4 +
 .../db/mpp/aggregation/MaxValueAccumulator.java    |   3 +
 .../db/mpp/aggregation/MinTimeAccumulator.java     |   3 +
 .../db/mpp/aggregation/MinValueAccumulator.java    |  14 +
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   |  17 ++
 .../execution/operator/AggregateOperatorTest.java  | 319 +++++++++++++++++++++
 .../operator/RawDataAggregateOperator.java         |  21 ++
 12 files changed, 401 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index 3c726f9603..28ef1e79ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -35,6 +35,7 @@ public class AvgAccumulator implements Accumulator {
   private TSDataType seriesDataType;
   private long countValue;
   private double sumValue;
+  private boolean initResult = false;
 
   public AvgAccumulator(TSDataType seriesDataType) {
     this.seriesDataType = seriesDataType;
@@ -67,6 +68,9 @@ public class AvgAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 2, "partialResult of Avg should be 2");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     countValue += partialResult[0].getLong(0);
     sumValue += partialResult[1].getDouble(0);
   }
@@ -130,6 +134,7 @@ public class AvgAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         countValue++;
         sumValue += column[1].getInt(i);
       }
@@ -144,6 +149,7 @@ public class AvgAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         countValue++;
         sumValue += column[1].getLong(i);
       }
@@ -158,6 +164,7 @@ public class AvgAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         countValue++;
         sumValue += column[1].getFloat(i);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
index c2825b00bb..4750cdf01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -31,6 +31,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 public class CountAccumulator implements Accumulator {
 
   private long countValue = 0;
+  private boolean initResult = false;
 
   public CountAccumulator() {}
 
@@ -53,6 +54,9 @@ public class CountAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of Count should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     countValue += partialResult[0].getLong(0);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index e70a7f484a..78e7c986c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -68,6 +68,9 @@ public class ExtremeAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of ExtremeValue should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         updateIntResult(partialResult[0].getInt(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index 45bce52036..908bbf4605 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -73,6 +73,9 @@ public class FirstValueAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 2, "partialResult of FirstValue should be 2");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         updateIntFirstValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index db418a490e..f85dd8f5be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -72,6 +72,9 @@ public class LastValueAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 2, "partialResult of LastValue should be 2");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         updateIntLastValue(partialResult[0].getInt(0), partialResult[1].getLong(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
index ce47210337..cdd2dfd563 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -30,6 +30,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 public class MaxTimeAccumulator implements Accumulator {
 
   protected long maxTime = Long.MIN_VALUE;
+  private boolean initResult = false;
 
   public MaxTimeAccumulator() {}
 
@@ -49,6 +50,9 @@ public class MaxTimeAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of MaxTime should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     updateMaxTime(partialResult[0].getLong(0));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index c07480a13d..606fce0a8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -69,6 +69,9 @@ public class MaxValueAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of MaxValue should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         updateIntResult(partialResult[0].getInt(0));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index a6af85093d..6969e82aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -51,6 +51,9 @@ public class MinTimeAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of MinTime should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     updateMinTime(partialResult[0].getLong(0));
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 8b24de07ea..6450e32ebc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -69,6 +69,9 @@ public class MinValueAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of MinValue should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         updateIntResult(partialResult[0].getInt(0));
@@ -116,6 +119,9 @@ public class MinValueAccumulator implements Accumulator {
   // finalResult should be single column, like: | finalCountValue |
   @Override
   public void setFinal(Column finalResult) {
+    if (finalResult.isNull(0)) {
+      return;
+    }
     minResult.setObject(finalResult.getObject(0));
   }
 
@@ -123,6 +129,10 @@ public class MinValueAccumulator implements Accumulator {
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
     checkArgument(columnBuilders.length == 1, "partialResult of MinValue should be 1");
+    if (!initResult) {
+      columnBuilders[0].appendNull();
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         columnBuilders[0].writeInt(minResult.getInt());
@@ -146,6 +156,10 @@ public class MinValueAccumulator implements Accumulator {
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
     switch (seriesDataType) {
       case INT32:
         columnBuilder.writeInt(minResult.getInt());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index 132fb017a9..5672e970fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -34,6 +34,7 @@ public class SumAccumulator implements Accumulator {
 
   private TSDataType seriesDataType;
   private double sumValue = 0;
+  private boolean initResult = false;
 
   public SumAccumulator(TSDataType seriesDataType) {
     this.seriesDataType = seriesDataType;
@@ -67,6 +68,9 @@ public class SumAccumulator implements Accumulator {
   @Override
   public void addIntermediate(Column[] partialResult) {
     checkArgument(partialResult.length == 1, "partialResult of Sum should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
     sumValue += partialResult[0].getDouble(0);
   }
 
@@ -83,6 +87,9 @@ public class SumAccumulator implements Accumulator {
   @Override
   public void setFinal(Column finalResult) {
     reset();
+    if (finalResult.isNull(0)) {
+      return;
+    }
     sumValue = finalResult.getDouble(0);
   }
 
@@ -90,11 +97,17 @@ public class SumAccumulator implements Accumulator {
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
     checkArgument(columnBuilders.length == 1, "partialResult of Sum should be 1");
+    if (!initResult) {
+      columnBuilders[0].appendNull();
+    }
     columnBuilders[0].writeDouble(sumValue);
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+    }
     columnBuilder.writeDouble(sumValue);
   }
 
@@ -126,6 +139,7 @@ public class SumAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         sumValue += column[1].getInt(i);
       }
     }
@@ -139,6 +153,7 @@ public class SumAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         sumValue += column[1].getLong(i);
       }
     }
@@ -152,6 +167,7 @@ public class SumAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         sumValue += column[1].getFloat(i);
       }
     }
@@ -165,6 +181,7 @@ public class SumAccumulator implements Accumulator {
         break;
       }
       if (!column[1].isNull(i)) {
+        initResult = true;
         sumValue += column[1].getDouble(i);
       }
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregateOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregateOperatorTest.java
new file mode 100644
index 0000000000..39e62f7c02
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregateOperatorTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.aggregation.Accumulator;
+import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+
+public class AggregateOperatorTest {
+
+  private static final String AGGREGATE_OPERATOR_TEST_SG = "root.AggregateOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+  private ExecutorService instanceNotificationExecutor;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, AGGREGATE_OPERATOR_TEST_SG);
+    this.instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+    instanceNotificationExecutor.shutdown();
+  }
+
+  /** Try to aggregate unary intermediate result of one time series without group by interval. */
+  @Test
+  public void testAggregateIntermediateResult1() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.COUNT);
+    aggregationTypes.add(AggregationType.SUM);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < aggregationTypes.size(); i++) {
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(1, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    AggregateOperator aggregateOperator =
+        initAggregateOperator(aggregationTypes, null, inputLocations);
+    int count = 0;
+    while (aggregateOperator.hasNext()) {
+      TsBlock resultTsBlock = aggregateOperator.next();
+      assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
+      assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
+      assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  /** Try to aggregate binary intermediate result of one time series without group by interval. */
+  @Test
+  public void testAggregateIntermediateResult2() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.AVG);
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < aggregationTypes.size(); i++) {
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(
+          new InputLocation[] {new InputLocation(0, 2 * i), new InputLocation(0, 2 * i + 1)});
+      inputLocationForOneAggregator.add(
+          new InputLocation[] {new InputLocation(1, 2 * i), new InputLocation(1, 2 * i + 1)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    AggregateOperator aggregateOperator =
+        initAggregateOperator(aggregationTypes, null, inputLocations);
+    int count = 0;
+    while (aggregateOperator.hasNext()) {
+      TsBlock resultTsBlock = aggregateOperator.next();
+      assertEquals(13049.5, resultTsBlock.getColumn(0).getDouble(0), 0.001);
+      assertEquals(20000, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(10499, resultTsBlock.getColumn(2).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testGroupByIntermediateResult1() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {100, 100, 100, 100},
+          {2004950, 2014950, 624950, 834950},
+          {0, 100, 200, 300},
+          {99, 199, 299, 399},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380}
+        };
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.COUNT);
+    aggregationTypes.add(AggregationType.SUM);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < aggregationTypes.size(); i++) {
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(1, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    AggregateOperator aggregateOperator =
+        initAggregateOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+    int count = 0;
+    while (aggregateOperator.hasNext()) {
+      TsBlock resultTsBlock = aggregateOperator.next();
+      assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][count], resultTsBlock.getColumn(0).getLong(0));
+      assertEquals(result[1][count], resultTsBlock.getColumn(1).getDouble(0), 0.0001);
+      assertEquals(result[2][count], resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(result[3][count], resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(result[4][count], resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(result[5][count], resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
+  @Test
+  public void testGroupByIntermediateResult2() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 399}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < aggregationTypes.size(); i++) {
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(
+          new InputLocation[] {new InputLocation(0, 2 * i), new InputLocation(0, 2 * i + 1)});
+      inputLocationForOneAggregator.add(
+          new InputLocation[] {new InputLocation(1, 2 * i), new InputLocation(1, 2 * i + 1)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    AggregateOperator aggregateOperator =
+        initAggregateOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+    int count = 0;
+    while (aggregateOperator.hasNext()) {
+      TsBlock resultTsBlock = aggregateOperator.next();
+      assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][count], resultTsBlock.getColumn(1).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
+  /**
+   * @param aggregationTypes Aggregation function used in test
+   * @param groupByTimeParameter
+   * @param inputLocations each inputLocation is used in one aggregator
+   */
+  private AggregateOperator initAggregateOperator(
+      List<AggregationType> aggregationTypes,
+      GroupByTimeParameter groupByTimeParameter,
+      List<List<InputLocation[]>> inputLocations)
+      throws IllegalPathException {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId1, SeriesAggregateScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    fragmentInstanceContext.addOperatorContext(
+        2, planNodeId2, SeriesAggregateScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId3 = new PlanNodeId("3");
+    fragmentInstanceContext.addOperatorContext(
+        3, planNodeId3, AggregateOperator.class.getSimpleName());
+
+    MeasurementPath measurementPath1 =
+        new MeasurementPath(AGGREGATE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.PARTIAL)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator1 =
+        new SeriesAggregateScanOperator(
+            planNodeId1,
+            measurementPath1,
+            Collections.singleton("sensor0"),
+            fragmentInstanceContext.getOperatorContexts().get(0),
+            aggregators,
+            null,
+            true,
+            groupByTimeParameter);
+    List<TsFileResource> seqResources1 = new ArrayList<>();
+    List<TsFileResource> unSeqResources1 = new ArrayList<>();
+    seqResources1.add(seqResources.get(0));
+    seqResources1.add(seqResources.get(1));
+    seqResources1.add(seqResources.get(3));
+    unSeqResources1.add(unSeqResources.get(0));
+    unSeqResources1.add(unSeqResources.get(1));
+    unSeqResources1.add(unSeqResources.get(3));
+    unSeqResources1.add(unSeqResources.get(5));
+    seriesAggregateScanOperator1.initQueryDataSource(
+        new QueryDataSource(seqResources1, unSeqResources1));
+
+    SeriesAggregateScanOperator seriesAggregateScanOperator2 =
+        new SeriesAggregateScanOperator(
+            planNodeId2,
+            measurementPath1,
+            Collections.singleton("sensor0"),
+            fragmentInstanceContext.getOperatorContexts().get(1),
+            aggregators,
+            null,
+            true,
+            groupByTimeParameter);
+    List<TsFileResource> seqResources2 = new ArrayList<>();
+    List<TsFileResource> unSeqResources2 = new ArrayList<>();
+    seqResources2.add(seqResources.get(2));
+    seqResources2.add(seqResources.get(4));
+    unSeqResources2.add(unSeqResources.get(2));
+    unSeqResources2.add(unSeqResources.get(4));
+    seriesAggregateScanOperator2.initQueryDataSource(
+        new QueryDataSource(seqResources2, unSeqResources2));
+
+    List<Operator> children = new ArrayList<>();
+    children.add(seriesAggregateScanOperator1);
+    children.add(seriesAggregateScanOperator2);
+
+    List<Aggregator> finalAggregators = new ArrayList<>();
+    List<Accumulator> accumulators =
+        AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true);
+    for (int i = 0; i < accumulators.size(); i++) {
+      finalAggregators.add(
+          new Aggregator(accumulators.get(i), AggregationStep.FINAL, inputLocations.get(i)));
+    }
+
+    return new AggregateOperator(
+        fragmentInstanceContext.getOperatorContexts().get(2),
+        finalAggregators,
+        children,
+        true,
+        groupByTimeParameter);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregateOperator.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregateOperator.java
new file mode 100644
index 0000000000..121959dc7f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregateOperator.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;public class RawDataAggregateOperator {
+}


[iotdb] 01/03: add aggregate Operator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1938ed555873cfa02d1a324ba8d734e2e1899b15
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 9 11:37:46 2022 +0800

    add aggregate Operator
---
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  1 +
 .../operator/process/AggregateOperator.java        | 91 ++++++++++++++++++++--
 ...Operator.java => RawDataAggregateOperator.java} | 42 +++++++++-
 .../source/SeriesAggregateScanOperator.java        | 41 ++++------
 4 files changed, 139 insertions(+), 36 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index a0a7b87f1b..f083f65fe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -105,6 +105,7 @@ public class Aggregator {
   }
 
   public void reset() {
+    timeRange = new TimeRange(0, Long.MAX_VALUE);
     accumulator.reset();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
index 71e7817b94..9926c46496 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
@@ -21,23 +21,60 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * AggregateOperator can process the situation: aggregation of intermediate aggregate result, it
+ * will output one result based on time interval too. One intermediate tsBlock input will only
+ * contain the result of one time interval exactly.
+ */
 public class AggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
   private final List<Operator> children;
 
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private ITimeRangeIterator timeRangeIterator;
+  // current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
   public AggregateOperator(
-      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+      OperatorContext operatorContext,
+      List<Aggregator> aggregators,
+      List<Operator> children,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
     this.children = children;
+
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   @Override
@@ -47,26 +84,68 @@ public class AggregateOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      ListenableFuture<Void> blocked = children.get(i).isBlocked();
+      if (!blocked.isDone()) {
+        return blocked;
+      }
+    }
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    // update input tsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      inputTsBlocks[i] = children.get(i).next();
+    }
+    // consume current input tsBlocks
+    for (Aggregator aggregator : aggregators) {
+      aggregator.reset();
+      aggregator.processTsBlocks(inputTsBlocks);
+    }
+    // output result from aggregator
+    return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, curTimeRange);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    if (!timeRangeIterator.hasNextTimeRange()) {
+      return false;
+    }
+    curTimeRange = timeRangeIterator.nextTimeRange();
+    return true;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    for (Operator child : children) {
+      child.close();
+    }
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return !this.hasNext();
+  }
+
+  public static TsBlock updateResultTsBlockFromAggregators(
+      TsBlockBuilder tsBlockBuilder, List<Aggregator> aggregators, TimeRange curTimeRange) {
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    // Use start time of current time range as time column
+    timeColumnBuilder.writeLong(curTimeRange.getMin());
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    int columnIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
+    }
+    tsBlockBuilder.declarePosition();
+    return tsBlockBuilder.build();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
index 71e7817b94..5fb705dc3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
@@ -16,28 +16,64 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class AggregateOperator implements ProcessOperator {
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
+/**
+ * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value
+ * filter. It's possible that there is more than one tsBlock input in one time interval. And it's
+ * also possible that one tsBlock can cover multiple time intervals too.
+ */
+public class RawDataAggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
   private final List<Operator> children;
 
-  public AggregateOperator(
-      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private ITimeRangeIterator timeRangeIterator;
+  // current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
+  public RawDataAggregateOperator(
+      OperatorContext operatorContext,
+      List<Aggregator> aggregators,
+      List<Operator> children,
+      boolean ascending,
+      GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
     this.children = children;
+
+    this.inputOperatorsCount = children.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index fdc50a808b..f86b5e5deb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
@@ -33,8 +34,6 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -101,7 +100,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   /**
@@ -109,7 +108,8 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
    * Aggregation query has only one time window and the result set of it does not contain a
    * timestamp, so it doesn't matter what the time range returns.
    */
-  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
+  public static ITimeRangeIterator initTimeRangeIterator(
+      GroupByTimeParameter groupByTimeParameter, boolean ascending) {
     if (groupByTimeParameter == null) {
       return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
     } else {
@@ -164,19 +164,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
       // 2. Calculate aggregation result based on current time window
       if (calcFromCacheData(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read page data firstly
       if (readAndCalcFromPage(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read chunk data secondly
       if (readAndCalcFromChunk(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
@@ -185,7 +185,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
         if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
           if (ascending) {
-            updateResultTsBlockUsingAggregateResult();
+            updateResultTsBlockFromAggregators();
             return true;
           } else {
             seriesScanUtil.skipCurrentFile();
@@ -202,35 +202,22 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
         // read chunk
         if (readAndCalcFromChunk(curTimeRange)) {
-          updateResultTsBlockUsingAggregateResult();
+          updateResultTsBlockFromAggregators();
           return true;
         }
       }
 
-      updateResultTsBlockUsingAggregateResult();
+      updateResultTsBlockFromAggregators();
       return true;
     } catch (IOException e) {
       throw new RuntimeException("Error while scanning the file", e);
     }
   }
 
-  private void updateResultTsBlockUsingAggregateResult() {
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    // Use start time of current time range as time column
-    timeColumnBuilder.writeLong(curTimeRange.getMin());
-    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    tsBlockBuilder.declarePosition();
-    resultTsBlock = tsBlockBuilder.build();
+  private void updateResultTsBlockFromAggregators() {
+    resultTsBlock =
+        AggregateOperator.updateResultTsBlockFromAggregators(
+            tsBlockBuilder, aggregators, curTimeRange);
     hasCachedTsBlock = true;
   }
 


[iotdb] 02/03: add RawDataAggregateOperator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregationOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 033b38ca0b0f736a7eaef048130bc9f4354efe2f
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 9 17:16:24 2022 +0800

    add RawDataAggregateOperator
---
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  42 ++++----
 .../operator/process/RawDataAggregateOperator.java | 115 ++++++++++++++++++---
 .../source/SeriesAggregateScanOperator.java        |  39 ++-----
 3 files changed, 133 insertions(+), 63 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index f083f65fe9..d38cbe88c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -55,32 +55,38 @@ public class Aggregator {
     this.inputLocationList = inputLocationList;
   }
 
-  // Used for SeriesAggregateScanOperator
+  // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
   public void processTsBlock(TsBlock tsBlock) {
     checkArgument(
-        step.isInputRaw(), "Step in SeriesAggregateScanOperator can only process raw input");
+        step.isInputRaw(),
+        "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
     // TODO Aligned TimeSeries
-    accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    if (inputLocationList == null) {
+      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    } else {
+      for (InputLocation[] inputLocations : inputLocationList) {
+        checkArgument(
+            inputLocations[0].getTsBlockIndex() == 1,
+            "RawDataAggregateOperator can only process one tsBlock input.");
+        Column[] timeValueColumn = new Column[2];
+        timeValueColumn[0] = tsBlock.getTimeColumn();
+        timeValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeValueColumn, timeRange);
+      }
+    }
   }
 
-  // Used for aggregateOperator
+  // Used for AggregateOperator
   public void processTsBlocks(TsBlock[] tsBlock) {
+    checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input");
     for (InputLocation[] inputLocations : inputLocationList) {
-      if (step.isInputRaw()) {
-        TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
-        Column[] timeValueColumn = new Column[2];
-        timeValueColumn[0] = rawTsBlock.getTimeColumn();
-        timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
-        accumulator.addInput(timeValueColumn, timeRange);
-      } else {
-        Column[] columns = new Column[inputLocations.length];
-        for (int i = 0; i < inputLocations.length; i++) {
-          columns[i] =
-              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
-                  inputLocations[i].getValueColumnIndex());
-        }
-        accumulator.addIntermediate(columns);
+      Column[] columns = new Column[inputLocations.length];
+      for (int i = 0; i < inputLocations.length; i++) {
+        columns[i] =
+            tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                inputLocations[i].getValueColumnIndex());
       }
+      accumulator.addIntermediate(columns);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
index 5fb705dc3a..bbe6e6f041 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -41,33 +42,38 @@ import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateS
  * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value
  * filter. It's possible that there is more than one tsBlock input in one time interval. And it's
  * also possible that one tsBlock can cover multiple time intervals too.
+ *
+ * <p>Since raw data query with value filter is processed by FilterOperator above TimeJoinOperator,
+ * there we can see RawDataAggregateOperator as a one-to-one(one input, ont output) operator.
+ *
+ * <p>Return aggregation result in one time interval once.
  */
 public class RawDataAggregateOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   private final List<Aggregator> aggregators;
-  private final List<Operator> children;
-
-  private final int inputOperatorsCount;
-  private final TsBlock[] inputTsBlocks;
-  private final TsBlockBuilder tsBlockBuilder;
-
+  private final Operator child;
+  private final boolean ascending;
   private ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
+  private TsBlock preCachedData;
+
+  // Using for building result tsBlock
+  private final TsBlockBuilder tsBlockBuilder;
+
   public RawDataAggregateOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
-      List<Operator> children,
+      Operator child,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
     this.operatorContext = operatorContext;
     this.aggregators = aggregators;
-    this.children = children;
+    this.child = child;
+    this.ascending = ascending;
 
-    this.inputOperatorsCount = children.size();
-    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
@@ -83,26 +89,105 @@ public class RawDataAggregateOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return child.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    // 1. Clear previous aggregation result
+    for (Aggregator aggregator : aggregators) {
+      aggregator.reset();
+      aggregator.setTimeRange(curTimeRange);
+    }
+
+    // 2. Calculate aggregation result based on current time window
+    while (!calcFromCacheData(curTimeRange)) {
+      preCachedData = child.next();
+    }
+
+    // 3. Update result using aggregators
+    return AggregateOperator.updateResultTsBlockFromAggregators(
+        tsBlockBuilder, aggregators, curTimeRange);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    if (!timeRangeIterator.hasNextTimeRange()) {
+      return false;
+    }
+    curTimeRange = timeRangeIterator.nextTimeRange();
+    return true;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return !this.hasNext();
+  }
+
+  /** @return if already get the result */
+  private boolean calcFromCacheData(TimeRange curTimeRange) {
+    // check if the batchData does not contain points in current interval
+    if (preCachedData != null && satisfied(preCachedData, curTimeRange, ascending)) {
+      // skip points that cannot be calculated
+      preCachedData = skipOutOfTimeRangePoints(preCachedData, curTimeRange, ascending);
+
+      for (Aggregator aggregator : aggregators) {
+        // current agg method has been calculated
+        if (aggregator.hasFinalResult()) {
+          continue;
+        }
+
+        aggregator.processTsBlock(preCachedData);
+      }
+    }
+    // The result is calculated from the cache
+    return (preCachedData != null
+            && (ascending
+                ? preCachedData.getEndTime() >= curTimeRange.getMax()
+                : preCachedData.getStartTime() < curTimeRange.getMin()))
+        || isEndCalc(aggregators);
+  }
+
+  // skip points that cannot be calculated
+  public static TsBlock skipOutOfTimeRangePoints(
+      TsBlock tsBlock, TimeRange curTimeRange, boolean ascending) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
+    if (ascending) {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
+        tsBlockIterator.next();
+      }
+    } else {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+        tsBlockIterator.next();
+      }
+    }
+    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
+  }
+
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
+    if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+      return false;
+    }
+
+    return ascending
+        ? (tsBlockIterator.getEndTime() >= timeRange.getMin()
+            && tsBlockIterator.currentTime() < timeRange.getMax())
+        : (tsBlockIterator.getStartTime() < timeRange.getMax()
+            && tsBlockIterator.currentTime() >= timeRange.getMin());
+  }
+
+  public static boolean isEndCalc(List<Aggregator> aggregators) {
+    for (Aggregator aggregator : aggregators) {
+      if (!aggregator.hasFinalResult()) {
+        return false;
+      }
+    }
+    return true;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index f86b5e5deb..23c8bc28be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -44,6 +44,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.isEndCalc;
+import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.skipOutOfTimeRangePoints;
+
 /**
  * This operator is responsible to do the aggregation calculation for one series based on global
  * time range and time split parameter.
@@ -250,18 +253,18 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             && (ascending
                 ? preCachedData.getEndTime() >= curTimeRange.getMax()
                 : preCachedData.getStartTime() < curTimeRange.getMin()))
-        || isEndCalc();
+        || isEndCalc(aggregators);
   }
 
   @SuppressWarnings("squid:S3776")
   private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange, ascending)) {
       return;
     }
 
     // skip points that cannot be calculated
-    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending);
 
     for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
@@ -278,22 +281,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     }
   }
 
-  // skip points that cannot be calculated
-  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) {
-    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
-    if (ascending) {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
-        tsBlockIterator.next();
-      }
-    } else {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
-        tsBlockIterator.next();
-      }
-    }
-    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
-  }
-
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) {
     TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
@@ -313,15 +301,6 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     return true;
   }
 
-  private boolean isEndCalc() {
-    for (Aggregator aggregator : aggregators) {
-      if (!aggregator.hasFinalResult()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
     while (seriesScanUtil.hasNextPage()) {
@@ -342,7 +321,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
           calcFromStatistics(pageStatistics);
           seriesScanUtil.skipCurrentPage();
-          if (isEndCalc()) {
+          if (isEndCalc(aggregators)) {
             return true;
           }
           continue;
@@ -369,7 +348,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
-      if (isEndCalc()
+      if (isEndCalc(aggregators)
           || (tsBlockIterator.hasNext()
               && (ascending
                   ? tsBlockIterator.currentTime() >= curTimeRange.getMax()