You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 11:13:42 UTC

[iotdb] branch groupbylevelOperator created (now 11ef91e384)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch groupbylevelOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 11ef91e384 TODO implement AggregationColumn

This branch includes the following new commits:

     new 9e880f663e implement group by level operator
     new 11ef91e384 TODO implement AggregationColumn

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: TODO implement AggregationColumn

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch groupbylevelOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11ef91e38471b3c12ca587193d43892de6254d3d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Apr 20 15:05:34 2022 +0800

    TODO implement AggregationColumn
---
 .../apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java    | 2 +-
 .../iotdb/tsfile/read/common/block/column/AggregationColumn.java      | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 7ca604ba62..13d43c0a12 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -114,7 +114,7 @@ public class GroupByLevelOperator implements ProcessOperator {
         aggregateResultList[j].reset();
         // merge results
         for (int columnIndex : this.groupedColumns.get(j)) {
-          aggregateResultList[j].merge(inputTsBlock.getColumn(columnIndex).getObject(i));
+          aggregateResultList[j].merge(inputTsBlock.getColumn(columnIndex));
         }
         columnBuilders[j].writeObject(aggregateResultList[i].getResult());
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/AggregationColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/AggregationColumn.java
new file mode 100644
index 0000000000..ca54112a82
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/AggregationColumn.java
@@ -0,0 +1,4 @@
+package org.apache.iotdb.tsfile.read.common.block.column;
+
+// TODO implement AggregationColumn
+public abstract class AggregationColumn implements Column {}


[iotdb] 01/02: implement group by level operator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch groupbylevelOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e880f663e28dce8a6658e452d050f88c30f3766
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Apr 20 14:56:49 2022 +0800

    implement group by level operator
---
 .../mpp/operator/process/GroupByLevelOperator.java | 97 ++++++++++++++++++++--
 .../db/mpp/operator/process/TimeJoinOperator.java  | 38 ++++++---
 .../plan/node/process/GroupByLevelNode.java        |  4 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  1 +
 4 files changed, 118 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
index 5f87d429a0..7ca604ba62 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/GroupByLevelOperator.java
@@ -18,40 +18,123 @@
  */
 package org.apache.iotdb.db.mpp.operator.process;
 
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * GroupByLevelOperator is a one-to-one operator, which accepts one tsBlock as input and group its
+ * columns by given levels and column names, and returns grouped TsBlock.
+ */
 public class GroupByLevelOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+  Operator child;
+
+  /**
+   * Means the ith column will be calculated by which columns. For example, if groupedColumns is
+   * [[0,2] [1,3]], which means the first column will be calculated by the 0th and 2nd column from
+   * inputTsBlock.
+   */
+  private final List<List<Integer>> groupedColumns;
+  /**
+   * Means the aggregation type of groupedColumns, the size of aggregationTypes should be equals to
+   * groupedColumns.
+   */
+  private final List<AggregationType> aggregationTypes;
+  /**
+   * Means the dataType of groupedColumns, which is calculated by AggregationType and datatype of
+   * seriesPath.
+   */
+  private final List<TSDataType> dataTypes;
+
+  /** Used to calculate result of output TsBlock. */
+  private AggregateResult[] aggregateResultList;
+
+  private TsBlockBuilder outputTsBlockBuilder;
+
+  GroupByLevelOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      List<List<Integer>> groupedColumns,
+      List<AggregationType> aggregationTypes,
+      List<TSDataType> dataTypes) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.groupedColumns = groupedColumns;
+    this.aggregationTypes = aggregationTypes;
+    this.dataTypes = dataTypes;
+    this.aggregateResultList = new AggregateResult[groupedColumns.size()];
+    for (int i = 0; i < aggregateResultList.length; i++) {
+      // Since we don't calculate by raw data, the ascending order is doesn't matter.
+      this.aggregateResultList[i] =
+          AggregateResultFactory.getAggrResultByType(
+              aggregationTypes.get(i), dataTypes.get(i), true);
+    }
+    outputTsBlockBuilder = new TsBlockBuilder(dataTypes);
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    ListenableFuture<Void> blocked = child.isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock inputTsBlock = child.next();
+
+    outputTsBlockBuilder.reset();
+    TimeColumnBuilder timeColumnBuilder = outputTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = outputTsBlockBuilder.getValueColumnBuilders();
+    for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
+      timeColumnBuilder.writeLong(inputTsBlock.getTimeColumn().getLong(i));
+
+      for (int j = 0; j < aggregateResultList.length; j++) {
+        aggregateResultList[j].reset();
+        // merge results
+        for (int columnIndex : this.groupedColumns.get(j)) {
+          aggregateResultList[j].merge(inputTsBlock.getColumn(columnIndex).getObject(i));
+        }
+        columnBuilders[j].writeObject(aggregateResultList[i].getResult());
+      }
+    }
+
+    return outputTsBlockBuilder.build();
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
-  public boolean isFinished() {
-    return false;
+  public boolean isFinished() throws IOException {
+    return child.isFinished();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index c5fbaf10b5..8d1e07abc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -40,17 +40,24 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private final List<Operator> children;
 
-  private final int inputCount;
+  private final int inputOperatorsNum;
 
+  /** TsBlock from child operator. Only one cache now. */
   private final TsBlock[] inputTsBlocks;
 
+  /** For each TsBlock in inputTsBlocks, inputIndex represents current row index of it. */
   private final int[] inputIndex;
 
+  /**
+   * Represent whether there are more tsBlocks from ith child operator. If all elements in
+   * noMoreTsBlocks[] are true and inputTsBlocks[] are consumed completely, this operator is
+   * finished.
+   */
   private final boolean[] noMoreTsBlocks;
 
   private final TimeSelector timeSelector;
 
-  private final int columnCount;
+  private final int outputColumnCount;
 
   /**
    * this field indicates each data type for output columns(not including time column) of
@@ -67,12 +74,13 @@ public class TimeJoinOperator implements ProcessOperator {
       List<TSDataType> dataTypes) {
     this.operatorContext = operatorContext;
     this.children = children;
-    this.inputCount = children.size();
-    this.inputTsBlocks = new TsBlock[this.inputCount];
-    this.inputIndex = new int[this.inputCount];
-    this.noMoreTsBlocks = new boolean[this.inputCount];
-    this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
-    this.columnCount = dataTypes.size();
+    this.inputOperatorsNum = children.size();
+    this.inputTsBlocks = new TsBlock[this.inputOperatorsNum];
+    this.inputIndex = new int[this.inputOperatorsNum];
+    this.noMoreTsBlocks = new boolean[this.inputOperatorsNum];
+    this.timeSelector =
+        new TimeSelector(this.inputOperatorsNum << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
+    this.outputColumnCount = dataTypes.size();
     this.dataTypes = dataTypes;
   }
 
@@ -83,7 +91,7 @@ public class TimeJoinOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    for (int i = 0; i < inputCount; i++) {
+    for (int i = 0; i < inputOperatorsNum; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
         ListenableFuture<Void> blocked = children.get(i).isBlocked();
         if (!blocked.isDone()) {
@@ -100,7 +108,7 @@ public class TimeJoinOperator implements ProcessOperator {
     // TsBlocks
     long currentEndTime = 0;
     boolean init = false;
-    for (int i = 0; i < inputCount; i++) {
+    for (int i = 0; i < inputOperatorsNum; i++) {
       if (!noMoreTsBlocks[i] && empty(i)) {
         inputIndex[i] = 0;
         inputTsBlocks[i] = children.get(i).next();
@@ -137,7 +145,7 @@ public class TimeJoinOperator implements ProcessOperator {
 
     tsBlockBuilder.buildValueColumnBuilders(dataTypes);
 
-    for (int i = 0, column = 0; i < inputCount; i++) {
+    for (int i = 0, column = 0; i < inputOperatorsNum; i++) {
       TsBlock block = inputTsBlocks[i];
       TimeColumn timeColumn = block.getTimeColumn();
       int valueColumnCount = block.getValueColumnCount();
@@ -158,7 +166,7 @@ public class TimeJoinOperator implements ProcessOperator {
     if (finished) {
       return false;
     }
-    for (int i = 0; i < inputCount; i++) {
+    for (int i = 0; i < inputOperatorsNum; i++) {
       if (!empty(i)) {
         return true;
       } else if (!noMoreTsBlocks[i]) {
@@ -185,7 +193,7 @@ public class TimeJoinOperator implements ProcessOperator {
       return true;
     }
     finished = true;
-    for (int i = 0; i < columnCount; i++) {
+    for (int i = 0; i < outputColumnCount; i++) {
       // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i]
       if (!noMoreTsBlocks[i] || !empty(i)) {
         finished = false;
@@ -195,6 +203,10 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  /**
+   * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
+   * return false;
+   */
   private boolean empty(int columnIndex) {
     return inputTsBlocks[columnIndex] == null
         || inputTsBlocks[columnIndex].getPositionCount() == inputIndex[columnIndex];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index dc06dec753..9d786a4103 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -57,11 +57,11 @@ import java.util.stream.Collectors;
  */
 public class GroupByLevelNode extends ProcessNode implements IOutputPlanNode {
 
-  private final int[] groupByLevels;
+  private PlanNode child;
 
   private final Map<ColumnHeader, ColumnHeader> groupedPathMap;
 
-  private PlanNode child;
+  private final int[] groupByLevels;
 
   private final List<ColumnHeader> columnHeaders;
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 256b95498d..e5f768affb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -63,6 +63,7 @@ public class TsBlock {
 
   private final Column[] valueColumns;
 
+  /** How many rows in current TsBlock */
   private final int positionCount;
 
   private volatile long retainedSizeInBytes = -1;