You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/05 00:47:16 UTC

[iotdb] branch master updated: [IOTDB-2845] Implementation of DeviceMergeOperator (#5791)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new de84a863a7 [IOTDB-2845] Implementation of DeviceMergeOperator (#5791)
de84a863a7 is described below

commit de84a863a76ed5ba0a2570a7e846149d80461e78
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu May 5 08:47:10 2022 +0800

    [IOTDB-2845] Implementation of DeviceMergeOperator (#5791)
---
 .../operator/process/DeviceMergeOperator.java      | 274 +++++++++++
 .../operator/process/DeviceViewOperator.java       |   8 +-
 .../operator/process/TimeJoinOperator.java         |   5 +-
 .../operator/process/merge/AscTimeComparator.java  |   4 +-
 .../operator/process/merge/ColumnMerger.java       |   2 +-
 .../operator/process/merge/DescTimeComparator.java |   4 +-
 .../operator/process/merge/SingleColumnMerger.java |   5 +-
 .../operator/process/merge/TimeComparator.java     |   4 +-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  45 +-
 .../operator/DeviceMergeOperatorTest.java          | 547 +++++++++++++++++++++
 .../execution/operator/DeviceViewOperatorTest.java |   8 +-
 11 files changed, 886 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
new file mode 100644
index 0000000000..aa71bda8be
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * DeviceMergeOperator is responsible for merging tsBlock coming from DeviceViewOperators.
+ *
+ * <p>If the devices in different dataNodes are different, we need to output tsBlocks of each node
+ * in order of device. If the same device exists in different nodes, the tsBlocks need to be merged
+ * by time within the device.
+ *
+ * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
+ * DeviceViewOperator.
+ */
+public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  private final List<TSDataType> dataTypes;
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  // device name of inputTsBlocks[], e.g. d1 in tsBlock1, d2 in tsBlock2
+  private final String[] deviceOfInputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+
+  private int curDeviceIndex;
+  // the index of curDevice in inputTsBlocks
+  private LinkedList<Integer> curDeviceTsBlockIndexList = new LinkedList<>();
+
+  private boolean finished;
+
+  private final TimeSelector timeSelector;
+  private final TimeComparator comparator;
+
+  public DeviceMergeOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      List<TSDataType> dataTypes,
+      TimeSelector selector,
+      TimeComparator comparator) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.inputOperatorsCount = deviceOperators.size();
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.deviceOfInputTsBlocks = new String[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.dataTypes = dataTypes;
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.timeSelector = selector;
+    this.comparator = comparator;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<Void> blocked = deviceOperators.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    // get new input TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && deviceOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = deviceOperators.get(i).next();
+        deviceOfInputTsBlocks[i] = getDeviceNameFromTsBlock(inputTsBlocks[i]);
+        tryToAddCurDeviceTsBlockList(i);
+      }
+    }
+    // move to next device
+    while (curDeviceTsBlockIndexList.isEmpty() && curDeviceIndex + 1 < devices.size()) {
+      getNextDeviceTsBlocks();
+    }
+    // process the curDeviceTsBlocks
+    if (curDeviceTsBlockIndexList.size() == 1) {
+      TsBlock resultTsBlock = inputTsBlocks[curDeviceTsBlockIndexList.get(0)];
+      inputTsBlocks[curDeviceTsBlockIndexList.get(0)] = null;
+      curDeviceTsBlockIndexList.clear();
+      return resultTsBlock;
+    } else {
+      tsBlockBuilder.reset();
+      int tsBlockSizeOfCurDevice = curDeviceTsBlockIndexList.size();
+      TsBlock[] deviceTsBlocks = new TsBlock[tsBlockSizeOfCurDevice];
+      TsBlockSingleColumnIterator[] tsBlockIterators =
+          new TsBlockSingleColumnIterator[tsBlockSizeOfCurDevice];
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        deviceTsBlocks[i] = inputTsBlocks[curDeviceTsBlockIndexList.get(i)];
+        tsBlockIterators[i] = deviceTsBlocks[i].getTsBlockSingleColumnIterator();
+      }
+      // Use the min end time of all tsBlocks as the end time of result tsBlock
+      // i.e. only one tsBlock will be consumed totally
+      long currentEndTime = deviceTsBlocks[0].getEndTime();
+      for (int i = 1; i < tsBlockSizeOfCurDevice; i++) {
+        currentEndTime =
+            comparator.getCurrentEndTime(currentEndTime, deviceTsBlocks[i].getEndTime());
+      }
+
+      TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+      ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+      while (!timeSelector.isEmpty()
+          && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
+        long timestamp = timeSelector.pollFirst();
+        timeBuilder.writeLong(timestamp);
+        // TODO process by column
+        // Try to find the tsBlock that timestamp belongs to
+        for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+          // TODO the same timestamp in different data region
+          if (tsBlockIterators[i].hasNext() && tsBlockIterators[i].currentTime() == timestamp) {
+            int rowIndex = tsBlockIterators[i].getRowIndex();
+            for (int j = 0; j < valueColumnBuilders.length; j++) {
+              // the jth column of rowIndex of ith tsBlock
+              if (deviceTsBlocks[i].getColumn(j).isNull(rowIndex)) {
+                valueColumnBuilders[j].appendNull();
+                continue;
+              }
+              valueColumnBuilders[j].write(deviceTsBlocks[i].getColumn(j), rowIndex);
+            }
+            tsBlockIterators[i].next();
+            break;
+          }
+        }
+        tsBlockBuilder.declarePosition();
+      }
+      // update tsBlock after consuming
+      int consumedTsBlockIndex = 0;
+      for (int i = 0; i < tsBlockSizeOfCurDevice; i++) {
+        if (tsBlockIterators[i].hasNext()) {
+          int rowIndex = tsBlockIterators[i].getRowIndex();
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] =
+              inputTsBlocks[curDeviceTsBlockIndexList.get(i)].subTsBlock(rowIndex);
+        } else {
+          inputTsBlocks[curDeviceTsBlockIndexList.get(i)] = null;
+          consumedTsBlockIndex = i;
+        }
+      }
+      curDeviceTsBlockIndexList.remove(consumedTsBlockIndex);
+      return tsBlockBuilder.build();
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isTsBlockEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (deviceOperators.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator deviceOperator : deviceOperators) {
+      deviceOperator.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      // has more tsBlock output from children[i] or has cached tsBlock in inputTsBlocks[i]
+      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  /** DeviceColumn must be the first value column of tsBlock transferred by DeviceViewOperator. */
+  private String getDeviceNameFromTsBlock(TsBlock tsBlock) {
+    if (tsBlock == null || tsBlock.getPositionCount() == 0 || tsBlock.getColumn(0).isNull(0)) {
+      return null;
+    }
+    return tsBlock.getColumn(0).getBinary(0).toString();
+  }
+
+  private String getCurDeviceName() {
+    return devices.get(curDeviceIndex);
+  }
+
+  private void getNextDeviceTsBlocks() {
+    curDeviceIndex++;
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      tryToAddCurDeviceTsBlockList(i);
+    }
+  }
+
+  private void tryToAddCurDeviceTsBlockList(int tsBlockIndex) {
+    if (deviceOfInputTsBlocks[tsBlockIndex] != null
+        && deviceOfInputTsBlocks[tsBlockIndex].equals(getCurDeviceName())) {
+      // add tsBlock of curDevice to a list
+      curDeviceTsBlockIndexList.add(tsBlockIndex);
+      // add all timestamp of curDevice to timeSelector
+      int rowSize = inputTsBlocks[tsBlockIndex].getPositionCount();
+      for (int row = 0; row < rowSize; row++) {
+        timeSelector.add(inputTsBlocks[tsBlockIndex].getTimeByIndex(row));
+      }
+    }
+  }
+
+  /**
+   * If the tsBlock of tsBlockIndex is null or has no more data in the tsBlock, return true; else
+   * return false;
+   */
+  private boolean isTsBlockEmpty(int tsBlockIndex) {
+    return inputTsBlocks[tsBlockIndex] == null
+        || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
index a90dcfd0bb..d3165042a6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewOperator.java
@@ -34,10 +34,10 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
 
 /**
- * Since devices have been sorted by the merge order as expected, what DeviceMergeOperator need to
- * do is traversing the device child operators, get all tsBlocks of one device and transform it to
- * the form we need, adding the device column and allocating value column to its expected location,
- * then get the next device operator until no next device.
+ * Since devices have been sorted by the merge order as expected, what DeviceViewOperator need to do
+ * is traversing the device child operators, get all tsBlocks of one device and transform it to the
+ * form we need, adding the device column and allocating value column to its expected location, then
+ * get the next device operator until no next device.
  *
  * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that have not transformed
  * the result form.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 0fe2bb2873..1ca4053049 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -143,7 +143,7 @@ public class TimeJoinOperator implements ProcessOperator {
       if (!empty(i)) {
         currentEndTime =
             init
-                ? comparator.getSatisfiedTime(currentEndTime, inputTsBlocks[i].getEndTime())
+                ? comparator.getCurrentEndTime(currentEndTime, inputTsBlocks[i].getEndTime())
                 : inputTsBlocks[i].getEndTime();
         init = true;
       }
@@ -156,7 +156,8 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    while (!timeSelector.isEmpty() && comparator.satisfy(timeSelector.first(), currentEndTime)) {
+    while (!timeSelector.isEmpty()
+        && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
       timeBuilder.writeLong(timeSelector.pollFirst());
       tsBlockBuilder.declarePosition();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 932262aab7..95b7316844 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -22,12 +22,12 @@ public class AscTimeComparator implements TimeComparator {
 
   /** @return if order by time asc, return true if time <= endTime, otherwise false */
   @Override
-  public boolean satisfy(long time, long endTime) {
+  public boolean satisfyCurEndTime(long time, long endTime) {
     return time <= endTime;
   }
 
   @Override
-  public long getSatisfiedTime(long time1, long time2) {
+  public long getCurrentEndTime(long time1, long time2) {
     return Math.min(time1, time2);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
index 4d301c8e8a..731827aea5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
@@ -40,7 +40,7 @@ public interface ColumnMerger {
   /**
    * merge columns belonging to same series into one column
    *
-   * @param inputTsBlocks all source TsBlocks, some of which will cantain source column
+   * @param inputTsBlocks all source TsBlocks, some of which will contain source column
    * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
    *     we should only read from this array and not update it because others will use the start
    *     index value in inputIndex array
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index 85e0e7b572..f53c97d8fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -22,12 +22,12 @@ public class DescTimeComparator implements TimeComparator {
 
   /** @return if order by time desc, return true if time >= endTime, otherwise false */
   @Override
-  public boolean satisfy(long time, long endTime) {
+  public boolean satisfyCurEndTime(long time, long endTime) {
     return time >= endTime;
   }
 
   @Override
-  public long getSatisfiedTime(long time1, long time2) {
+  public long getCurrentEndTime(long time1, long time2) {
     return Math.max(time1, time2);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
index d7279ed875..92cc2f3a94 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
@@ -53,7 +53,8 @@ public class SingleColumnMerger implements ColumnMerger {
     // input column is empty or current time of input column is already larger than currentEndTime
     // just appendNull rowCount null
     if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
-        || !comparator.satisfy(inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
+        || !comparator.satisfyCurEndTime(
+            inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
       columnBuilder.appendNull(rowCount);
     } else {
       // read from input column and write it into columnBuilder
@@ -63,7 +64,7 @@ public class SingleColumnMerger implements ColumnMerger {
         // current index reaches the size of input column or current time of input column is already
         // larger than currentEndTime, use null column to fill the remaining
         if (timeColumn.getPositionCount() == index
-            || !comparator.satisfy(
+            || !comparator.satisfyCurEndTime(
                 inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
           columnBuilder.appendNull(rowCount - i);
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index 8587df43dc..db017f6186 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process.merge;
 public interface TimeComparator {
 
   /** @return true if time is satisfied with endTime, otherwise false */
-  boolean satisfy(long time, long endTime);
+  boolean satisfyCurEndTime(long time, long endTime);
 
   /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */
-  long getSatisfiedTime(long time1, long time2);
+  long getCurrentEndTime(long time1, long time2);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e78e5e4738..2ab14daa6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
@@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SeriesSchema
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -82,6 +85,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OutputColumn;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -335,7 +339,46 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
-      return super.visitDeviceView(node, context);
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              DeviceViewNode.class.getSimpleName());
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      return new DeviceViewOperator(operatorContext, node.getDevices(), children, null, null);
+    }
+
+    @Override
+    public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              DeviceViewNode.class.getSimpleName());
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+      TimeSelector selector = null;
+      TimeComparator timeComparator = null;
+      for (OrderBy orderBy : node.getMergeOrders()) {
+        switch (orderBy) {
+          case TIMESTAMP_ASC:
+            selector = new TimeSelector(node.getChildren().size() << 1, true);
+            timeComparator = ASC_TIME_COMPARATOR;
+            break;
+          case TIMESTAMP_DESC:
+            selector = new TimeSelector(node.getChildren().size() << 1, false);
+            timeComparator = DESC_TIME_COMPARATOR;
+            break;
+        }
+      }
+      return new DeviceMergeOperator(
+          operatorContext, node.getDevices(), children, dataTypes, selector, timeComparator);
     }
 
     @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
new file mode 100644
index 0000000000..9aa09f18a5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DeviceMergeOperatorTest {
+
+  private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, DEVICE_MERGE_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  /**
+   * Construct DeviceMergeOperator with different devices in two DeviceViewOperators.
+   *
+   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
+   *
+   * <p>DeviceViewOperator2: [seriesScanOperator: [device1.sensor1]]
+   *
+   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+   * and the sensor0 column of device1 should be null.
+   */
+  @Test
+  public void deviceMergeOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
+
+      List<TSDataType> dataTypes = new ArrayList<>();
+      dataTypes.add(TSDataType.TEXT);
+      dataTypes.add(TSDataType.INT32);
+      dataTypes.add(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      DeviceViewOperator deviceViewOperator1 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+              Collections.singletonList(seriesScanOperator1),
+              Collections.singletonList(Collections.singletonList(1)),
+              dataTypes);
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      DeviceViewOperator deviceViewOperator2 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1"),
+              Collections.singletonList(seriesScanOperator2),
+              Collections.singletonList(Collections.singletonList(2)),
+              dataTypes);
+
+      List<String> devices = new ArrayList<>();
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
+      List<Operator> deviceOperators = new ArrayList<>();
+      deviceOperators.add(deviceViewOperator1);
+      deviceOperators.add(deviceViewOperator2);
+      DeviceMergeOperator deviceMergeOperator =
+          new DeviceMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              devices,
+              deviceOperators,
+              dataTypes,
+              new TimeSelector(500, true),
+              new AscTimeComparator());
+
+      int count = 0;
+      while (deviceMergeOperator.hasNext()) {
+        TsBlock tsBlock = deviceMergeOperator.next();
+        assertEquals(3, tsBlock.getValueColumnCount());
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * (count % 25);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          assertEquals(
+              count < 25
+                  ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
+                  : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
+              tsBlock.getColumn(0).getBinary(i).getStringValue());
+          if (expectedTime < 200) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        count++;
+      }
+      assertEquals(50, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  /**
+   * Construct DeviceMergeOperator with the same device in two DeviceViewOperators.
+   *
+   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0]],
+   *
+   * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
+   *
+   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+   * and the sensor0 column of device1 should be null.
+   */
+  @Test
+  public void deviceMergeOperatorTest2() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, DeviceViewOperatorTest.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, DeviceViewOperatorTest.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, DeviceMergeOperator.class.getSimpleName());
+
+      List<TSDataType> dataTypes = new ArrayList<>();
+      dataTypes.add(TSDataType.TEXT);
+      dataTypes.add(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      List<TsFileResource> seqResources1 = new ArrayList<>();
+      List<TsFileResource> unSeqResources1 = new ArrayList<>();
+      seqResources1.add(seqResources.get(0));
+      seqResources1.add(seqResources.get(1));
+      seqResources1.add(seqResources.get(3));
+      unSeqResources1.add(unSeqResources.get(0));
+      unSeqResources1.add(unSeqResources.get(1));
+      unSeqResources1.add(unSeqResources.get(3));
+      unSeqResources1.add(unSeqResources.get(5));
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
+      DeviceViewOperator deviceViewOperator1 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+              Collections.singletonList(seriesScanOperator1),
+              Collections.singletonList(Collections.singletonList(1)),
+              dataTypes);
+
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      List<TsFileResource> seqResources2 = new ArrayList<>();
+      List<TsFileResource> unSeqResources2 = new ArrayList<>();
+      seqResources2.add(seqResources.get(2));
+      seqResources2.add(seqResources.get(4));
+      unSeqResources2.add(unSeqResources.get(2));
+      unSeqResources2.add(unSeqResources.get(4));
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
+      DeviceViewOperator deviceViewOperator2 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+              Collections.singletonList(seriesScanOperator2),
+              Collections.singletonList(Collections.singletonList(1)),
+              dataTypes);
+
+      List<String> devices = new ArrayList<>();
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+      List<Operator> deviceOperators = new ArrayList<>();
+      deviceOperators.add(deviceViewOperator1);
+      deviceOperators.add(deviceViewOperator2);
+      DeviceMergeOperator deviceMergeOperator =
+          new DeviceMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              devices,
+              deviceOperators,
+              dataTypes,
+              new TimeSelector(500, true),
+              new AscTimeComparator());
+
+      int count = 0;
+      while (deviceMergeOperator.hasNext()) {
+        TsBlock tsBlock = deviceMergeOperator.next();
+        assertEquals(2, tsBlock.getValueColumnCount());
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * (count % 25);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          assertEquals(
+              DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+              tsBlock.getColumn(0).getBinary(i).getStringValue());
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+          }
+        }
+        count++;
+      }
+      assertEquals(25, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
+  /**
+   * Construct DeviceMergeOperator with the same and different device at the same time in two
+   * DeviceViewOperators.
+   *
+   * <p>DeviceViewOperator1: [seriesScanOperator: [device0.sensor0], [device1.sensor1]],
+   *
+   * <p>DeviceViewOperator2: [seriesScanOperator: [device0.sensor0]]
+   *
+   * <p>the result tsBlock should be like [Device, sensor0, sensor1]. The sensor1 column of device0
+   * and the sensor0 column of device1 should be null.
+   */
+  @Test
+  public void deviceMergeOperatorTest3() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          4, new PlanNodeId("4"), DeviceViewOperatorTest.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          5, new PlanNodeId("5"), DeviceViewOperatorTest.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          6, new PlanNodeId("6"), DeviceMergeOperator.class.getSimpleName());
+
+      List<TSDataType> dataTypes = new ArrayList<>();
+      dataTypes.add(TSDataType.TEXT);
+      dataTypes.add(TSDataType.INT32);
+      dataTypes.add(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      List<TsFileResource> seqResources1 = new ArrayList<>();
+      List<TsFileResource> unSeqResources1 = new ArrayList<>();
+      seqResources1.add(seqResources.get(0));
+      seqResources1.add(seqResources.get(1));
+      seqResources1.add(seqResources.get(3));
+      unSeqResources1.add(unSeqResources.get(0));
+      unSeqResources1.add(unSeqResources.get(1));
+      unSeqResources1.add(unSeqResources.get(3));
+      unSeqResources1.add(unSeqResources.get(5));
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources1, unSeqResources1));
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      List<String> devices = new ArrayList<>();
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0");
+      devices.add(DEVICE_MERGE_OPERATOR_TEST_SG + ".device1");
+      List<Operator> deviceOperators = new ArrayList<>();
+      deviceOperators.add(seriesScanOperator1);
+      deviceOperators.add(seriesScanOperator2);
+      List<List<Integer>> deviceColumnIndex = new ArrayList<>();
+      deviceColumnIndex.add(Collections.singletonList(1));
+      deviceColumnIndex.add(Collections.singletonList(2));
+      DeviceViewOperator deviceViewOperator1 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              devices,
+              deviceOperators,
+              deviceColumnIndex,
+              dataTypes);
+
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              true);
+      List<TsFileResource> seqResources2 = new ArrayList<>();
+      List<TsFileResource> unSeqResources2 = new ArrayList<>();
+      seqResources2.add(seqResources.get(2));
+      seqResources2.add(seqResources.get(4));
+      unSeqResources2.add(unSeqResources.get(2));
+      unSeqResources2.add(unSeqResources.get(4));
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources2, unSeqResources2));
+      DeviceViewOperator deviceViewOperator2 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              Collections.singletonList(DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"),
+              Collections.singletonList(seriesScanOperator3),
+              Collections.singletonList(Collections.singletonList(1)),
+              dataTypes);
+
+      List<Operator> deviceViewOperators = new ArrayList<>();
+      deviceViewOperators.add(deviceViewOperator1);
+      deviceViewOperators.add(deviceViewOperator2);
+      DeviceMergeOperator deviceMergeOperator =
+          new DeviceMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              devices,
+              deviceViewOperators,
+              dataTypes,
+              new TimeSelector(500, true),
+              new AscTimeComparator());
+
+      int count = 0;
+      while (deviceMergeOperator.hasNext()) {
+        TsBlock tsBlock = deviceMergeOperator.next();
+        assertEquals(3, tsBlock.getValueColumnCount());
+        assertEquals(20, tsBlock.getPositionCount());
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * (count % 25);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          assertEquals(
+              count < 25
+                  ? DEVICE_MERGE_OPERATOR_TEST_SG + ".device0"
+                  : DEVICE_MERGE_OPERATOR_TEST_SG + ".device1",
+              tsBlock.getColumn(0).getBinary(i).getStringValue());
+          if (expectedTime < 200) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          } else {
+            if (!tsBlock.getColumn(1).isNull(i)) {
+              assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+              assertTrue(tsBlock.getColumn(2).isNull(i));
+            } else {
+              assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+            }
+          }
+        }
+        count++;
+      }
+      assertEquals(50, count);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
index 2b9ab37160..b98ced7ea7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceViewOperatorTest.java
@@ -55,7 +55,7 @@ import static org.junit.Assert.fail;
 
 public class DeviceViewOperatorTest {
 
-  private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceMergeOperatorTest";
+  private static final String DEVICE_MERGE_OPERATOR_TEST_SG = "root.DeviceViewOperatorTest";
   private final List<String> deviceIds = new ArrayList<>();
   private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
 
@@ -142,7 +142,7 @@ public class DeviceViewOperatorTest {
       dataTypes.add(TSDataType.INT32);
       dataTypes.add(TSDataType.INT32);
 
-      DeviceViewOperator deviceMergeOperator =
+      DeviceViewOperator deviceViewOperator =
           new DeviceViewOperator(
               fragmentInstanceContext.getOperatorContexts().get(2),
               devices,
@@ -150,8 +150,8 @@ public class DeviceViewOperatorTest {
               deviceColumnIndex,
               dataTypes);
       int count = 0;
-      while (deviceMergeOperator.hasNext()) {
-        TsBlock tsBlock = deviceMergeOperator.next();
+      while (deviceViewOperator.hasNext()) {
+        TsBlock tsBlock = deviceViewOperator.next();
         assertEquals(3, tsBlock.getValueColumnCount());
         assertEquals(20, tsBlock.getPositionCount());
         for (int i = 0; i < tsBlock.getPositionCount(); i++) {