You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/10/08 07:46:31 UTC

[GitHub] [iotdb] JackieTien97 commented on a diff in pull request #6661: [IOTDB-3784] tag aggregation: basic support

JackieTien97 commented on code in PR #6661:
URL: https://github.com/apache/iotdb/pull/6661#discussion_r990581078


##########
docs/zh/UserGuide/Query-Data/Aggregate-Query.md:
##########
@@ -448,9 +448,175 @@ Total line number = 7
 It costs 0.004s
 ```
 
+## 标签聚合查询
+
+IoTDB 还支持通过 `GROUP BY TAGS` 语句根据时间序列中定义的标签的键值做聚合查询。
+
+我们先在 IoTDB 中写入如下示例数据,稍后会以这些数据为例介绍标签聚合查询。
+
+这些是某工厂 `factory1` 在多个城市的多个车间的设备温度数据, 时间范围为 [1000, 10000)。
+
+时间序列路径中的设备一级是设备唯一标识。城市信息 `city` 和车间信息 `workshop` 则被建模在该设备时间序列的标签中。
+其中,设备 `d1`、`d2` 在 `Beijing` 的 `w1` 车间, `d3`、`d4` 在 `Beijing` 的 `w2` 车间,`d5`、`d6` 在 `Shanghai` 的 `w1` 车间,`d7` 在 `Shanghai` 的 `w2` 车间。
+`d8` 和 `d9` 设备目前处于调试阶段,还未被分配到具体的城市和车间,所以其相应的标签值为空值。
+
+```SQL
+set storage group to root.factory1;
+create timeseries root.factory1.d1.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d2.temperature with datatype=FLOAT tags(city=Beijing, workshop=w1);
+create timeseries root.factory1.d3.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d4.temperature with datatype=FLOAT tags(city=Beijing, workshop=w2);
+create timeseries root.factory1.d5.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d6.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w1);
+create timeseries root.factory1.d7.temperature with datatype=FLOAT tags(city=Shanghai, workshop=w2);
+create timeseries root.factory1.d8.temperature with datatype=FLOAT;
+create timeseries root.factory1.d9.temperature with datatype=FLOAT;
+
+insert into root.factory1.d1(time, temperature) values(1000, 104.0);
+insert into root.factory1.d1(time, temperature) values(3000, 104.2);
+insert into root.factory1.d1(time, temperature) values(5000, 103.3);
+insert into root.factory1.d1(time, temperature) values(7000, 104.1);
+
+insert into root.factory1.d2(time, temperature) values(1000, 104.4);
+insert into root.factory1.d2(time, temperature) values(3000, 103.7);
+insert into root.factory1.d2(time, temperature) values(5000, 103.3);
+insert into root.factory1.d2(time, temperature) values(7000, 102.9);
+
+insert into root.factory1.d3(time, temperature) values(1000, 103.9);
+insert into root.factory1.d3(time, temperature) values(3000, 103.8);
+insert into root.factory1.d3(time, temperature) values(5000, 102.7);
+insert into root.factory1.d3(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d4(time, temperature) values(1000, 103.9);
+insert into root.factory1.d4(time, temperature) values(5000, 102.7);
+insert into root.factory1.d4(time, temperature) values(7000, 106.9);
+
+insert into root.factory1.d5(time, temperature) values(1000, 112.9);
+insert into root.factory1.d5(time, temperature) values(7000, 113.0);
+
+insert into root.factory1.d6(time, temperature) values(1000, 113.9);
+insert into root.factory1.d6(time, temperature) values(3000, 113.3);
+insert into root.factory1.d6(time, temperature) values(5000, 112.7);
+insert into root.factory1.d6(time, temperature) values(7000, 112.3);
+
+insert into root.factory1.d7(time, temperature) values(1000, 101.2);
+insert into root.factory1.d7(time, temperature) values(3000, 99.3);
+insert into root.factory1.d7(time, temperature) values(5000, 100.1);
+insert into root.factory1.d7(time, temperature) values(7000, 99.8);
+
+insert into root.factory1.d8(time, temperature) values(1000, 50.0);
+insert into root.factory1.d8(time, temperature) values(3000, 52.1);
+insert into root.factory1.d8(time, temperature) values(5000, 50.1);
+insert into root.factory1.d8(time, temperature) values(7000, 50.5);
+
+insert into root.factory1.d9(time, temperature) values(1000, 50.3);
+insert into root.factory1.d9(time, temperature) values(3000, 52.1);
+```
+
+### 单标签聚合查询
+
+用户想统统计该工厂每个地区的设备的温度的平均值,可以使用如下查询语句

Review Comment:
   ```suggestion
   用户想统计该工厂每个地区的设备的温度的平均值,可以使用如下查询语句
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java:
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.lang3.Validate;
+
+import javax.annotation.Nullable;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class GroupByTagNode extends MultiChildNode {
+
+  private final List<String> tagKeys;
+  private final Map<List<String>, List<GroupByTagAggregationDescriptor>>
+      tagValuesToAggregationDescriptors;
+  private final List<String> outputColumnNames;
+
+  // The parameter of `group by time`.
+  // Its value will be null if there is no `group by time` clause.
+  @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+  protected Ordering scanOrder;
+
+  public GroupByTagNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder,
+      List<String> tagKeys,
+      Map<List<String>, List<GroupByTagAggregationDescriptor>> tagValuesToAggregationDescriptors,
+      List<String> outputColumnNames) {
+    super(id, children);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = Validate.notNull(scanOrder);
+    this.tagKeys = Validate.notNull(tagKeys);
+    this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
+    this.outputColumnNames = Validate.notNull(outputColumnNames);
+  }
+
+  public GroupByTagNode(
+      PlanNodeId id,
+      @Nullable GroupByTimeParameter groupByTimeParameter,
+      Ordering scanOrder,
+      List<String> tagKeys,
+      Map<List<String>, List<GroupByTagAggregationDescriptor>> tagValuesToAggregationDescriptors,
+      List<String> outputColumnNames) {
+    super(id);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = Validate.notNull(scanOrder);
+    this.tagKeys = Validate.notNull(tagKeys);
+    this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
+    this.outputColumnNames = Validate.notNull(outputColumnNames);
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    this.children.add(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    // TODO: better do deep copy
+    return new GroupByTagNode(
+        getPlanNodeId(),
+        this.groupByTimeParameter,
+        this.scanOrder,
+        this.tagKeys,
+        this.tagValuesToAggregationDescriptors,
+        this.outputColumnNames);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    List<String> ret = new ArrayList<>(tagKeys);
+    ret.addAll(outputColumnNames);
+    return ret;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitGroupByTag(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    // Plan type.
+    PlanNodeType.GROUP_BY_TAG.serialize(byteBuffer);
+
+    // Tag keys.
+    ReadWriteIOUtils.writeStringList(tagKeys, byteBuffer);
+
+    // Tag values to aggregation descriptors.
+    ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), byteBuffer);
+    for (Entry<List<String>, List<GroupByTagAggregationDescriptor>> entry :
+        tagValuesToAggregationDescriptors.entrySet()) {
+      ReadWriteIOUtils.writeStringList(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
+      for (GroupByTagAggregationDescriptor aggregationDescriptor : entry.getValue()) {
+        if (aggregationDescriptor == null) {
+          ReadWriteIOUtils.write((byte) 0, byteBuffer);
+        } else {
+          ReadWriteIOUtils.write((byte) 1, byteBuffer);
+          aggregationDescriptor.serialize(byteBuffer);
+        }
+      }
+    }
+
+    // Output column names.
+    ReadWriteIOUtils.writeStringList(outputColumnNames, byteBuffer);
+
+    // Group by time parameter.
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByTimeParameter.serialize(byteBuffer);
+    }
+
+    // Scan order.
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    // Plan type.
+    PlanNodeType.GROUP_BY_TAG.serialize(stream);
+
+    // Tag keys.
+    ReadWriteIOUtils.writeStringList(tagKeys, stream);
+
+    // Tag values to aggregation descriptors.
+    ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), stream);
+    for (Entry<List<String>, List<GroupByTagAggregationDescriptor>> entry :
+        tagValuesToAggregationDescriptors.entrySet()) {
+      ReadWriteIOUtils.writeStringList(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue().size(), stream);
+      for (GroupByTagAggregationDescriptor aggregationDescriptor : entry.getValue()) {
+        if (aggregationDescriptor == null) {
+          ReadWriteIOUtils.write((byte) 0, stream);
+        } else {
+          ReadWriteIOUtils.write((byte) 1, stream);
+          aggregationDescriptor.serialize(stream);
+        }
+      }
+    }
+
+    // Output column names.
+    ReadWriteIOUtils.writeStringList(outputColumnNames, stream);
+
+    // Group by time parameter.
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, stream);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
+      groupByTimeParameter.serialize(stream);
+    }
+
+    // Scan order.
+    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+  }
+
+  @Nullable
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  public Ordering getScanOrder() {
+    return scanOrder;
+  }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+
+  public Map<List<String>, List<GroupByTagAggregationDescriptor>>
+      getTagValuesToAggregationDescriptors() {
+    return tagValuesToAggregationDescriptors;
+  }
+
+  public static GroupByTagNode deserialize(ByteBuffer byteBuffer) {
+    // Tag keys.
+    List<String> tagKeys = ReadWriteIOUtils.readStringList(byteBuffer);
+
+    // Tag values to aggregation descriptors.
+    int numOfEntries = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<List<String>, List<GroupByTagAggregationDescriptor>> tagValuesToAggregationDescriptors =
+        new HashMap<>();
+    while (numOfEntries > 0) {
+      List<String> tagValues = ReadWriteIOUtils.readStringList(byteBuffer);
+      List<GroupByTagAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
+      int numOfAggregationDescriptors = ReadWriteIOUtils.readInt(byteBuffer);
+      while (numOfAggregationDescriptors > 0) {
+        byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
+        if (isNotNull == 1) {
+          aggregationDescriptors.add(GroupByTagAggregationDescriptor.deserialize(byteBuffer));
+        }
+        numOfAggregationDescriptors -= 1;
+      }
+      tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
+      numOfEntries -= 1;
+    }
+
+    // Output column names.
+    List<String> outputColumnNames = ReadWriteIOUtils.readStringList(byteBuffer);
+
+    // Group by time parameter.
+    byte hasGroupByTimeParameter = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = null;
+    if (hasGroupByTimeParameter == 1) {
+      groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+    }
+
+    // Scan order.
+    Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new GroupByTagNode(
+        planNodeId,
+        groupByTimeParameter,
+        scanOrder,
+        tagKeys,
+        tagValuesToAggregationDescriptors,
+        outputColumnNames);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    GroupByTagNode that = (GroupByTagNode) o;
+    return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && scanOrder == that.scanOrder
+        && Objects.equals(tagKeys, that.tagKeys)
+        && Objects.equals(tagValuesToAggregationDescriptors, that.tagValuesToAggregationDescriptors)
+        && Objects.equals(outputColumnNames, that.outputColumnNames);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(),
+        groupByTimeParameter,
+        scanOrder,
+        tagKeys,
+        tagValuesToAggregationDescriptors,
+        outputColumnNames);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "GroupByTagNode-%s: Output: %s, Input: %s",
+        getPlanNodeId(),
+        getOutputColumnNames(),
+        tagValuesToAggregationDescriptors.values().stream()
+            .flatMap(list -> list.stream().map(AggregationDescriptor::getInputExpressions))
+            .collect(Collectors.toList()));
+  }
+
+  public static class GroupByTagAggregationDescriptor extends AggregationDescriptor {

Review Comment:
   It seems that this class is same as `GroupByLevelDescriptor`. I think we can keep only one of them and for sure we should take a new class name like `CrossSeriesAggregationDescriptor` or else.



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java:
##########
@@ -137,7 +137,7 @@ protected List<String> getActualAggregationNames(boolean isPartial) {
           outputAggregationNames.add(AggregationType.MAX_TIME.name().toLowerCase());
           break;
         default:
-          outputAggregationNames.add(aggregationFuncName);
+          outputAggregationNames.add(aggregationFuncName.toLowerCase());
       }
     } else {
       outputAggregationNames.add(aggregationFuncName);

Review Comment:
   if here need to call `toLowerCase()` ? @liuminghui233 



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TagAggregationOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<List<String>> groups;
+  private final List<List<Aggregator>> groupedAggregators;
+  private final List<Operator> children;
+  private final TsBlock[] inputTsBlocks;
+
+  // This fields record the to be consumed index of each tsBlock.
+  private final int[] consumedIndices;
+  private final BitMap dataReady;
+  private final BitMap childFinished;

Review Comment:
   ```suggestion
   ```
   not  used.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TagAggregationOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<List<String>> groups;
+  private final List<List<Aggregator>> groupedAggregators;
+  private final List<Operator> children;
+  private final TsBlock[] inputTsBlocks;
+
+  // This fields record the to be consumed index of each tsBlock.
+  private final int[] consumedIndices;
+  private final BitMap dataReady;
+  private final BitMap childFinished;
+  private final TsBlockBuilder tsBlockBuilder;
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
+  public TagAggregationOperator(
+      OperatorContext operatorContext,
+      List<List<String>> groups,
+      List<List<Aggregator>> groupedAggregators,
+      List<Operator> children,
+      long maxReturnSize) {
+    this.operatorContext = Validate.notNull(operatorContext);
+    this.groups = Validate.notNull(groups);
+    this.groupedAggregators = Validate.notNull(groupedAggregators);
+    this.children = Validate.notNull(children);
+    List<TSDataType> actualOutputColumnTypes = new ArrayList<>();
+    for (String ignored : groups.get(0)) {
+      actualOutputColumnTypes.add(TSDataType.TEXT);
+    }
+    for (int outputColumnIdx = 0;
+        outputColumnIdx < groupedAggregators.get(0).size();
+        outputColumnIdx++) {
+      for (List<Aggregator> aggregators : groupedAggregators) {
+        Aggregator aggregator = aggregators.get(outputColumnIdx);
+        if (aggregator != null) {
+          actualOutputColumnTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+          break;
+        }
+      }
+    }
+    this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
+    // Initialize input tsblocks for each aggregator group.
+    this.inputTsBlocks = new TsBlock[children.size()];
+    this.consumedIndices = new int[children.size()];
+    this.dataReady = new BitMap(children.size());
+    this.childFinished = new BitMap(children.size());
+    this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    boolean successful = true;
+    while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
+      successful = processOneRow();
+    }
+    TsBlock tsBlock = null;
+    if (tsBlockBuilder.getPositionCount() > 0) {
+      tsBlock = tsBlockBuilder.build();
+    }
+    tsBlockBuilder.reset();
+    return tsBlock;
+  }
+
+  private boolean processOneRow() {
+    for (int i = 0; i < children.size(); i++) {
+      if (dataReady.isMarked(i)) {
+        continue;
+      }
+      if (dataUnavailable(i)) {
+        // Find next non-empty tsblock of the child
+        do {
+          inputTsBlocks[i] = children.get(i).next();
+        } while (inputTsBlocks[i] != null && inputTsBlocks[i].isEmpty());

Review Comment:
   we should only call child.next() only once in each operator.next(), the reason can be found in `Operator implementation` chapter of this [feishu doc](https://apache-iotdb.feishu.cn/docs/doccnWnRJQmCWqscG0sO3V4BMnd#WzO22U)



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TagAggregationOperator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.lang3.Validate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TagAggregationOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<List<String>> groups;
+  private final List<List<Aggregator>> groupedAggregators;
+  private final List<Operator> children;
+  private final TsBlock[] inputTsBlocks;
+
+  // This fields record the to be consumed index of each tsBlock.
+  private final int[] consumedIndices;
+  private final BitMap dataReady;
+  private final BitMap childFinished;
+  private final TsBlockBuilder tsBlockBuilder;
+  private final long maxRetainedSize;
+  private final long childrenRetainedSize;
+  private final long maxReturnSize;
+
+  public TagAggregationOperator(
+      OperatorContext operatorContext,
+      List<List<String>> groups,
+      List<List<Aggregator>> groupedAggregators,
+      List<Operator> children,
+      long maxReturnSize) {
+    this.operatorContext = Validate.notNull(operatorContext);
+    this.groups = Validate.notNull(groups);
+    this.groupedAggregators = Validate.notNull(groupedAggregators);
+    this.children = Validate.notNull(children);
+    List<TSDataType> actualOutputColumnTypes = new ArrayList<>();
+    for (String ignored : groups.get(0)) {
+      actualOutputColumnTypes.add(TSDataType.TEXT);
+    }
+    for (int outputColumnIdx = 0;
+        outputColumnIdx < groupedAggregators.get(0).size();
+        outputColumnIdx++) {
+      for (List<Aggregator> aggregators : groupedAggregators) {
+        Aggregator aggregator = aggregators.get(outputColumnIdx);
+        if (aggregator != null) {
+          actualOutputColumnTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+          break;
+        }
+      }
+    }
+    this.tsBlockBuilder = new TsBlockBuilder(actualOutputColumnTypes);
+    // Initialize input tsblocks for each aggregator group.
+    this.inputTsBlocks = new TsBlock[children.size()];
+    this.consumedIndices = new int[children.size()];
+    this.dataReady = new BitMap(children.size());
+    this.childFinished = new BitMap(children.size());
+    this.maxRetainedSize = children.stream().mapToLong(Operator::calculateMaxReturnSize).sum();
+    this.childrenRetainedSize =
+        children.stream().mapToLong(Operator::calculateRetainedSizeAfterCallingNext).sum();
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public TsBlock next() {
+    long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+    boolean successful = true;
+    while (System.nanoTime() - start < maxRuntime && !tsBlockBuilder.isFull() && successful) {
+      successful = processOneRow();
+    }
+    TsBlock tsBlock = null;
+    if (tsBlockBuilder.getPositionCount() > 0) {
+      tsBlock = tsBlockBuilder.build();
+    }
+    tsBlockBuilder.reset();
+    return tsBlock;
+  }
+
+  private boolean processOneRow() {
+    for (int i = 0; i < children.size(); i++) {
+      if (dataReady.isMarked(i)) {
+        continue;
+      }
+      if (dataUnavailable(i)) {
+        // Find next non-empty tsblock of the child
+        do {
+          inputTsBlocks[i] = children.get(i).next();
+        } while (inputTsBlocks[i] != null && inputTsBlocks[i].isEmpty());
+        consumedIndices[i] = 0;
+      }
+
+      // Blocked by children i
+      if (inputTsBlocks[i] == null) {
+        return false;
+      }
+      dataReady.mark(i);
+    }
+
+    TsBlock[] rowBlocks = new TsBlock[children.size()];
+    for (int i = 0; i < children.size(); i++) {
+      rowBlocks[i] = inputTsBlocks[i].getRegion(consumedIndices[i], 1);
+    }
+    for (int groupIdx = 0; groupIdx < groups.size(); groupIdx++) {
+      List<String> group = groups.get(groupIdx);
+      List<Aggregator> aggregators = groupedAggregators.get(groupIdx);
+
+      for (Aggregator aggregator : aggregators) {
+        if (aggregator == null) {
+          continue;
+        }
+        aggregator.reset();
+        aggregator.processTsBlocks(rowBlocks);
+      }
+
+      TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      timeColumnBuilder.writeLong(rowBlocks[0].getStartTime());
+      ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+
+      for (int i = 0; i < group.size(); i++) {
+        if (group.get(i) == null) {
+          columnBuilders[i].writeBinary(new Binary("NULL"));
+        } else {
+          columnBuilders[i].writeBinary(new Binary(group.get(i)));
+        }
+      }
+      for (int i = 0; i < aggregators.size(); i++) {
+        Aggregator aggregator = aggregators.get(i);
+        ColumnBuilder columnBuilder = columnBuilders[i + group.size()];
+        if (aggregator == null) {
+          columnBuilder.appendNull();
+        } else {
+          aggregator.outputResult(new ColumnBuilder[] {columnBuilder});
+        }
+      }
+      tsBlockBuilder.declarePosition();
+    }
+
+    // Reset dataReady for next iteration
+    for (int i = 0; i < children.size(); i++) {
+      consumedIndices[i]++;
+      dataReady.unmark(i);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean hasNext() {
+    for (int i = 0; i < children.size(); i++) {
+      if (dataUnavailable(i) && !children.get(i).hasNext()) {
+        return false;
+      }

Review Comment:
   Only one child finised doesn't mean that other children are all finished.



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java:
##########
@@ -610,6 +612,23 @@ private boolean shouldUseNaiveAggregation(PlanNode root) {
     return false;
   }
 
+  @Override
+  public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
+    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+    GroupByTagNode newRoot = (GroupByTagNode) root.clone();
+    for (Entry<TRegionReplicaSet, List<SeriesAggregationSourceNode>> entry :
+        sourceGroup.entrySet()) {
+      for (SeriesAggregationSourceNode node : entry.getValue()) {
+        newRoot.addChild(node);
+      }
+    }
+
+    return newRoot;
+  }

Review Comment:
   It seems that you won't do partial aggregate on each datanode first and then do final aggregate in the root datanode. You can refer to `groupSourcesForGroupByLevel` and `calculateGroupByLevelNodeAttributes` in `visitGroupByLevel` of class `SourceReWriter`. `GroupByLevelNode` has already implemented such optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org