You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/05/04 11:42:34 UTC

[GitHub] [iotdb] JackieTien97 commented on a diff in pull request #5791: [IOTDB-2845] Implementation of DeviceMergeOperator

JackieTien97 commented on code in PR #5791:
URL: https://github.com/apache/iotdb/pull/5791#discussion_r864697647


##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java:
##########
@@ -335,7 +339,45 @@ public Operator visitSeriesAggregate(
 
     @Override
     public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
-      return super.visitDeviceView(node, context);
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              DeviceViewNode.class.getSimpleName());
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null);

Review Comment:
   You can use `measurements` field in `DeviceViewNode` and TypeProvider to calculate the last two `null` parameter.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  private TSDataType[] dataTypes;
+  private TsBlockBuilder tsBlockBuilder;
+
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
+  private final String[] deviceOfInputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+
+  private int curDeviceIndex;
+  // the index of curDevice in inputTsBlocks
+  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
+
+  private boolean finished;
+
+  private final TimeSelector timeSelector;
+  private final TimeComparator comparator;
+
+  public DeviceMergeOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      TimeSelector selector,
+      TimeComparator comparator) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.inputOperatorsCount = deviceOperators.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.timeSelector = selector;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    // get new input TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = deviceOperators.get(i).next();
+        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
+        tryToAddCurDeviceTsBlockList(i);
+      }
+    }
+    // move to next device
+    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) {
+      getNextDeviceTsBlocks();
+    }
+    // process the curDeviceTsBlocks
+    if (curDeviceTsBlockIndexList.size() == 1) {
+      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
+      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
+      curDeviceTsBlockIndexList.clear();
+      return resultTsBlock;
+    } else {
+      if (tsBlockBuilder == null) {
+        initTsBlockBuilderFromTsBlock(inputTsBlocks[curDeviceTsBlockIndexList.get(0)]);
+      } else {
+        tsBlockBuilder.reset();
+      }
+      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
+      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
+      TsBlockSingleColumnIterator[] tsBlockIterators =
+          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
+        tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator();
+      }
+      // Use the min end time of all tsBlocks as the end time of result tsBlock
+      // i.e. only one tsBlock will be consumed totally
+      long currentEndTime = deviceTsBlocks[0].getEndTime();
+      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
+        currentEndTime =
+            comparator.getCurrentEndTime(currentEndTime, deviceTsBlocks[i].getEndTime());
+      }
+
+      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      while (!timeSelector.isEmpty()
+          && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
+        long timestamp = timeSelector.pollFirst();
+        timeBuilder.writeLong(timestamp);
+        // TODO process by column
+        // Try to find the tsBlock that timestamp belongs to
+        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+          // TODO the same timestamp in different data region
+          if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
+            int rowIndex = tsBlockIterators[i].getRowIndex();
+            for (int j = 0; j < valueColumnBuilders.length; j++) {
+              // the jth column of rowIndex of ith tsBlock
+              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
+                valueColumnBuilders[j].appendNull();
+                continue;
+              }
+              switch (dataTypes[j]) {
+                case BOOLEAN:
+                  valueColumnBuilders[j].writeBoolean(
+                      deviceTsBlocks[i].getColumn(j).getBoolean(rowIndex));
+                  break;
+                case INT32:
+                  valueColumnBuilders[j].writeInt(deviceTsBlocks[i].getColumn(j).getInt(rowIndex));
+                  break;
+                case INT64:
+                  valueColumnBuilders[j].writeLong(
+                      deviceTsBlocks[i].getColumn(j).getLong(rowIndex));
+                  break;
+                case FLOAT:
+                  valueColumnBuilders[j].writeFloat(
+                      deviceTsBlocks[i].getColumn(j).getFloat(rowIndex));
+                  break;
+                case DOUBLE:
+                  valueColumnBuilders[j].writeDouble(
+                      deviceTsBlocks[i].getColumn(j).getDouble(rowIndex));
+                  break;
+                case TEXT:
+                  valueColumnBuilders[j].writeBinary(
+                      deviceTsBlocks[i].getColumn(j).getBinary(rowIndex));
+                  break;
+              }
+            }
+            tsBlockIterators[i].next();
+            break;
+          }
+        }
+        tsBlockBuilder.declarePosition();
+      }
+      // update tsBlock after consuming
+      int consumedTsBlockIndex = 0;
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        if (tsBlockIterators[i].hasNext()) {
+          int rowIndex = tsBlockIterators[i].getRowIndex();
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
+              inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
+        } else {
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
+          consumedTsBlockIndex = i;
+        }
+      }
+      curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
+      return tsBlockBuilder.build();
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isTsBlockEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (deviceOperators.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator deviceOperator : deviceOperators) {
+      deviceOperator.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i]
+      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  private void initTsBlockBuilderFromTsBlock(TsBlock tsBlock) {
+    dataTypes = tsBlock.getValueDataTypes();
+    tsBlockBuilder = new TsBlockBuilder(Arrays.asList(dataTypes));
+  }

Review Comment:
   You don't have to get dataType from child's TsBlock in runtime. Instead, you can get the dataType in `LocalExecutionPlanner` before you run the operator.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.

Review Comment:
   Size of `devices` is same to size of all devices, but size of `deviceOperators` is equal to size of children operator, they are not same.



##########
tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java:
##########
@@ -213,6 +214,14 @@ public Column[] getColumns(int[] columnIndexes) {
     return columns;
   }
 
+  public TSDataType[] getValueDataTypes() {
+    TSDataType[] dataTypes = new TSDataType[valueColumns.length];
+    for (int i = 0; i < valueColumns.length; i++) {
+      dataTypes[i] = valueColumns[i].getDataType();
+    }
+    return dataTypes;
+  }

Review Comment:
   Better not to try to get TsDataType info from TsBlock, because we may make TsBlock typeless which means Float and Int share the same Int32Column.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.

Review Comment:
   BTW, I think we can totally get devices from children operator, we don't need to save this field in `DeviceMergeOperator`. It can be optimized in future.



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  private TSDataType[] dataTypes;
+  private TsBlockBuilder tsBlockBuilder;
+
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
+  private final String[] deviceOfInputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+
+  private int curDeviceIndex;
+  // the index of curDevice in inputTsBlocks
+  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
+
+  private boolean finished;
+
+  private final TimeSelector timeSelector;
+  private final TimeComparator comparator;
+
+  public DeviceMergeOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      TimeSelector selector,
+      TimeComparator comparator) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.inputOperatorsCount = deviceOperators.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.timeSelector = selector;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    // get new input TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = deviceOperators.get(i).next();
+        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
+        tryToAddCurDeviceTsBlockList(i);
+      }
+    }
+    // move to next device
+    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) {
+      getNextDeviceTsBlocks();
+    }
+    // process the curDeviceTsBlocks
+    if (curDeviceTsBlockIndexList.size() == 1) {
+      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
+      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
+      curDeviceTsBlockIndexList.clear();
+      return resultTsBlock;
+    } else {
+      if (tsBlockBuilder == null) {
+        initTsBlockBuilderFromTsBlock(inputTsBlocks[curDeviceTsBlockIndexList.get(0)]);
+      } else {
+        tsBlockBuilder.reset();
+      }
+      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
+      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
+      TsBlockSingleColumnIterator[] tsBlockIterators =
+          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
+        tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator();
+      }
+      // Use the min end time of all tsBlocks as the end time of result tsBlock
+      // i.e. only one tsBlock will be consumed totally
+      long currentEndTime = deviceTsBlocks[0].getEndTime();
+      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
+        currentEndTime =
+            comparator.getCurrentEndTime(currentEndTime, deviceTsBlocks[i].getEndTime());
+      }
+
+      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      while (!timeSelector.isEmpty()
+          && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
+        long timestamp = timeSelector.pollFirst();
+        timeBuilder.writeLong(timestamp);
+        // TODO process by column
+        // Try to find the tsBlock that timestamp belongs to
+        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+          // TODO the same timestamp in different data region
+          if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
+            int rowIndex = tsBlockIterators[i].getRowIndex();
+            for (int j = 0; j < valueColumnBuilders.length; j++) {
+              // the jth column of rowIndex of ith tsBlock
+              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
+                valueColumnBuilders[j].appendNull();
+                continue;
+              }
+              switch (dataTypes[j]) {
+                case BOOLEAN:
+                  valueColumnBuilders[j].writeBoolean(
+                      deviceTsBlocks[i].getColumn(j).getBoolean(rowIndex));
+                  break;
+                case INT32:
+                  valueColumnBuilders[j].writeInt(deviceTsBlocks[i].getColumn(j).getInt(rowIndex));
+                  break;
+                case INT64:
+                  valueColumnBuilders[j].writeLong(
+                      deviceTsBlocks[i].getColumn(j).getLong(rowIndex));
+                  break;
+                case FLOAT:
+                  valueColumnBuilders[j].writeFloat(
+                      deviceTsBlocks[i].getColumn(j).getFloat(rowIndex));
+                  break;
+                case DOUBLE:
+                  valueColumnBuilders[j].writeDouble(
+                      deviceTsBlocks[i].getColumn(j).getDouble(rowIndex));
+                  break;
+                case TEXT:
+                  valueColumnBuilders[j].writeBinary(
+                      deviceTsBlocks[i].getColumn(j).getBinary(rowIndex));
+                  break;
+              }
+            }
+            tsBlockIterators[i].next();
+            break;
+          }
+        }
+        tsBlockBuilder.declarePosition();
+      }
+      // update tsBlock after consuming
+      int consumedTsBlockIndex = 0;
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        if (tsBlockIterators[i].hasNext()) {
+          int rowIndex = tsBlockIterators[i].getRowIndex();
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
+              inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
+        } else {
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
+          consumedTsBlockIndex = i;
+        }
+      }
+      curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
+      return tsBlockBuilder.build();
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isTsBlockEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (deviceOperators.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator deviceOperator : deviceOperators) {
+      deviceOperator.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i]
+      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  private void initTsBlockBuilderFromTsBlock(TsBlock tsBlock) {
+    dataTypes = tsBlock.getValueDataTypes();
+    tsBlockBuilder = new TsBlockBuilder(Arrays.asList(dataTypes));
+  }
+
+  /** DeviceColumn must be the first value column of tsBlock transferred by DeviceViewOperator. */
+  private String getDeviceNameFromTsBlock(TsBlock tsBlock) {
+    if (tsBlock.getColumn(0).isNull(0)) {

Review Comment:
   ```suggestion
       if (tsBlock == null || tsBlock.getPositionCount() == 0 || tsBlock.getColumn(0).isNull(0)) {
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  private TSDataType[] dataTypes;
+  private TsBlockBuilder tsBlockBuilder;
+
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
+  private final String[] deviceOfInputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+
+  private int curDeviceIndex;
+  // the index of curDevice in inputTsBlocks
+  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
+
+  private boolean finished;
+
+  private final TimeSelector timeSelector;
+  private final TimeComparator comparator;
+
+  public DeviceMergeOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      TimeSelector selector,
+      TimeComparator comparator) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.inputOperatorsCount = deviceOperators.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.timeSelector = selector;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    // get new input TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = deviceOperators.get(i).next();
+        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
+        tryToAddCurDeviceTsBlockList(i);
+      }
+    }
+    // move to next device
+    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) {
+      getNextDeviceTsBlocks();
+    }
+    // process the curDeviceTsBlocks
+    if (curDeviceTsBlockIndexList.size() == 1) {
+      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
+      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
+      curDeviceTsBlockIndexList.clear();
+      return resultTsBlock;
+    } else {
+      if (tsBlockBuilder == null) {
+        initTsBlockBuilderFromTsBlock(inputTsBlocks[curDeviceTsBlockIndexList.get(0)]);
+      } else {
+        tsBlockBuilder.reset();
+      }
+      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
+      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
+      TsBlockSingleColumnIterator[] tsBlockIterators =
+          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
+        tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator();
+      }
+      // Use the min end time of all tsBlocks as the end time of result tsBlock
+      // i.e. only one tsBlock will be consumed totally
+      long currentEndTime = deviceTsBlocks[0].getEndTime();
+      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
+        currentEndTime =
+            comparator.getCurrentEndTime(currentEndTime, deviceTsBlocks[i].getEndTime());
+      }
+
+      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      while (!timeSelector.isEmpty()
+          && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
+        long timestamp = timeSelector.pollFirst();
+        timeBuilder.writeLong(timestamp);
+        // TODO process by column
+        // Try to find the tsBlock that timestamp belongs to
+        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+          // TODO the same timestamp in different data region
+          if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
+            int rowIndex = tsBlockIterators[i].getRowIndex();
+            for (int j = 0; j < valueColumnBuilders.length; j++) {
+              // the jth column of rowIndex of ith tsBlock
+              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
+                valueColumnBuilders[j].appendNull();
+                continue;
+              }
+              switch (dataTypes[j]) {
+                case BOOLEAN:
+                  valueColumnBuilders[j].writeBoolean(
+                      deviceTsBlocks[i].getColumn(j).getBoolean(rowIndex));
+                  break;
+                case INT32:
+                  valueColumnBuilders[j].writeInt(deviceTsBlocks[i].getColumn(j).getInt(rowIndex));
+                  break;
+                case INT64:
+                  valueColumnBuilders[j].writeLong(
+                      deviceTsBlocks[i].getColumn(j).getLong(rowIndex));
+                  break;
+                case FLOAT:
+                  valueColumnBuilders[j].writeFloat(
+                      deviceTsBlocks[i].getColumn(j).getFloat(rowIndex));
+                  break;
+                case DOUBLE:
+                  valueColumnBuilders[j].writeDouble(
+                      deviceTsBlocks[i].getColumn(j).getDouble(rowIndex));
+                  break;
+                case TEXT:
+                  valueColumnBuilders[j].writeBinary(
+                      deviceTsBlocks[i].getColumn(j).getBinary(rowIndex));
+                  break;
+              }

Review Comment:
   use `write(Column column, int index);` function in `ColumnBuilder` to remove this ugly switch-case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org