You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/09 13:32:58 UTC

[iotdb] branch master updated: [IOTDB-4572] [IOTDB-3580] support order by in align by device

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 150a5dc7ab [IOTDB-4572] [IOTDB-3580] support order by in align by device
150a5dc7ab is described below

commit 150a5dc7ab2666291bc4088549b12990dc755ad1
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Dec 9 21:32:50 2022 +0800

    [IOTDB-4572] [IOTDB-3580] support order by in align by device
---
 .../IoTDBOrderByWithAlignByDeviceIT.java           | 1178 ++++++++++++++++
 .../iotdb/db/it/query/IoTDBNullValueFillIT.java    |    2 +-
 .../operator/process/MergeSortOperator.java        |  227 +++
 .../operator/process/SingleDeviceViewOperator.java |  137 ++
 .../process/join/merge/MergeSortComparator.java    |  145 ++
 .../plan/planner/LocalExecutionPlanContext.java    |   11 +
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |   77 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |    9 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   48 +
 .../planner/distribution/DistributionPlanner.java  |    8 +-
 .../planner/distribution/ExchangeNodeAdder.java    |  114 +-
 .../plan/planner/distribution/SourceRewriter.java  |  186 ++-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   25 +
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   10 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   10 +
 .../planner/plan/node/SimplePlanNodeRewriter.java  |   20 +-
 .../planner/plan/node/process/MergeSortNode.java   |  123 ++
 .../plan/node/process/SingleDeviceViewNode.java    |  145 ++
 .../plan/statement/component/OrderByComponent.java |    3 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |   11 +-
 .../db/utils/datastructure/MergeSortHeap.java      |  113 ++
 .../iotdb/db/utils/datastructure/MergeSortKey.java |   41 +
 .../execution/operator/MergeSortOperatorTest.java  | 1475 ++++++++++++++++++++
 .../operator/SingleDeviceViewOperatorTest.java     |  207 +++
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |   10 +-
 .../distribution/AggregationDistributionTest.java  |    4 +-
 26 files changed, 4209 insertions(+), 130 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
new file mode 100644
index 0000000000..e6bbd3633a
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
@@ -0,0 +1,1178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBOrderByWithAlignByDeviceIT {
+  private static final String[] places =
+      new String[] {
+        "root.weather.London",
+        "root.weather.Edinburgh",
+        "root.weather.Belfast",
+        "root.weather.Birmingham",
+        "root.weather.Liverpool",
+        "root.weather.Derby",
+        "root.weather.Durham",
+        "root.weather.Hereford",
+        "root.weather.Manchester",
+        "root.weather.Oxford"
+      };
+  private static final long startPrecipitation = 200;
+  private static final double startTemperature = 20.0;
+  private static final long startTime = 1668960000000L;
+  private static final int numOfPointsInDevice = 20;
+  private static final long timeGap = 100L;
+  private static final Map<String, Long> deviceToStartTimestamp = new HashMap<>();
+  public static final Map<String, double[]> deviceToMaxTemperature = new HashMap<>();
+  public static final Map<String, double[]> deviceToAvgTemperature = new HashMap<>();
+  public static final Map<String, long[]> deviceToMaxPrecipitation = new HashMap<>();
+  public static final Map<String, double[]> deviceToAvgPrecipitation = new HashMap<>();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeClass();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+  }
+
+  /**
+   * This method generate test data with crossing time.
+   *
+   * <p>The data can be viewed in online doc:
+   *
+   * <p>https://docs.google.com/spreadsheets/d/18XlOIi27ZIIdRnar2WNXVMxkZwjgwlPZmzJLVpZRpAA/edit#gid=0
+   */
+  private static void insertData() {
+    try (Connection iotDBConnection = EnvFactory.getEnv().getConnection();
+        Statement statement = iotDBConnection.createStatement()) {
+      // create TimeSeries
+      for (String place : places) {
+        String PRE_PRECIPITATION = place + ".precipitation";
+        String PRE_TEMPERATURE = place + ".temperature";
+        String createPrecipitationSql =
+            "CREATE TIMESERIES " + PRE_PRECIPITATION + " WITH DATATYPE=INT64, ENCODING=RLE";
+        String createTemperatureSql =
+            "CREATE TIMESERIES " + PRE_TEMPERATURE + " WITH DATATYPE=DOUBLE, ENCODING=RLE";
+        statement.execute(createPrecipitationSql);
+        statement.execute(createTemperatureSql);
+      }
+      // insert data
+      long start = startTime;
+      double[][] temperatures = new double[places.length][29];
+      long[][] precipitations = new long[places.length][29];
+      for (int index = 0; index < places.length; index++) {
+        String place = places[index];
+
+        for (int i = 0; i < numOfPointsInDevice; i++) {
+          long precipitation = startPrecipitation + place.hashCode() + (start + i * timeGap);
+          double temperature = startTemperature + place.hashCode() + (start + i * timeGap);
+          precipitations[index][(int) ((start - startTime) / timeGap) + i] = precipitation;
+          temperatures[index][(int) ((start - startTime) / timeGap) + i] = temperature;
+          String insertUniqueTime =
+              "INSERT INTO "
+                  + place
+                  + "(timestamp,precipitation,temperature) VALUES("
+                  + (start + i * timeGap)
+                  + ","
+                  + precipitation
+                  + ","
+                  + temperature
+                  + ")";
+          statement.execute(insertUniqueTime);
+          if (i == 0) deviceToStartTimestamp.put(place, start);
+        }
+        start += timeGap;
+      }
+
+      for (int i = 0; i < places.length; i++) {
+        double[] aT = new double[3];
+        double[] aP = new double[3];
+        double[] mT = new double[3];
+        long[] mP = new long[3];
+        double totalTemperature = 0;
+        long totalPrecipitation = 0;
+        double maxTemperature = -1;
+        long maxPrecipitation = -1;
+        int cnt = 0;
+        for (int j = 0; j < precipitations[i].length; j++) {
+          totalTemperature += temperatures[i][j];
+          totalPrecipitation += precipitations[i][j];
+          maxPrecipitation = Math.max(maxPrecipitation, precipitations[i][j]);
+          maxTemperature = Math.max(maxTemperature, temperatures[i][j]);
+          if ((j + 1) % 10 == 0 || j == precipitations[i].length - 1) {
+            aT[cnt] = totalTemperature / 10;
+            aP[cnt] = (double) totalPrecipitation / 10;
+            mP[cnt] = maxPrecipitation;
+            mT[cnt] = maxTemperature;
+            cnt++;
+            totalTemperature = 0;
+            totalPrecipitation = 0;
+            maxTemperature = -1;
+            maxPrecipitation = -1;
+          }
+        }
+        deviceToMaxTemperature.put(places[i], mT);
+        deviceToMaxPrecipitation.put(places[i], mP);
+        deviceToAvgTemperature.put(places[i], aT);
+        deviceToAvgPrecipitation.put(places[i], aP);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void checkHeader(ResultSetMetaData resultSetMetaData, String title) throws SQLException {
+    String[] headers = title.split(",");
+    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+      assertEquals(headers[i - 1], resultSetMetaData.getColumnName(i));
+    }
+  }
+
+  // ORDER BY DEVICE
+  @Test
+  public void orderByDeviceTest1() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE ALIGN BY DEVICE";
+    Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByDeviceTest2() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC ALIGN BY DEVICE";
+    Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByDeviceTest3() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC ALIGN BY DEVICE";
+    Object[] expectedDevice =
+        Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // ORDER BY TIME
+
+  @Test
+  public void orderByTimeTest1() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = -1;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp >= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByTimeTest2() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME ASC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = -1;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp >= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByTimeTest3() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME DESC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = Long.MAX_VALUE;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp <= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // ORDER BY DEVICE,TIME
+
+  @Test
+  public void orderByDeviceTimeTest1() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC,TIME DESC ALIGN BY DEVICE";
+    Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(
+              deviceToStartTimestamp.get(actualDevice) + timeGap * (numOfPointsInDevice - cnt - 1),
+              actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByDeviceTimeTest2() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC,TIME ASC ALIGN BY DEVICE";
+    Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByDeviceTimeTest3() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC,TIME DESC ALIGN BY DEVICE";
+    Object[] expectedDevice =
+        Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(
+              deviceToStartTimestamp.get(actualDevice) + timeGap * (numOfPointsInDevice - cnt - 1),
+              actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByDeviceTimeTest4() {
+    String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC,TIME ASC ALIGN BY DEVICE";
+    Object[] expectedDevice =
+        Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+    int index = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedDevice[index], actualDevice);
+          assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+                  < 0.00001);
+
+          cnt++;
+          if (cnt % numOfPointsInDevice == 0) {
+            index++;
+            cnt = 0;
+          }
+        }
+        assertEquals(10, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // ORDER BY TIME,DEVICE
+
+  @Test
+  public void orderByTimeDeviceTest1() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME ASC,DEVICE DESC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = -1;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp >= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByTimeDeviceTest2() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME ASC,DEVICE ASC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = -1;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp >= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByTimeDeviceTest3() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME DESC,DEVICE DESC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = Long.MAX_VALUE;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp <= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void orderByTimeDeviceTest4() {
+    String sql = "SELECT * FROM root.** ORDER BY TIME DESC,DEVICE ASC ALIGN BY DEVICE";
+    int total = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+        long lastTimeStamp = Long.MAX_VALUE;
+        String lastDevice = "";
+        while (resultSet.next()) {
+          long actualTimeStamp = resultSet.getLong(1);
+          assertTrue(actualTimeStamp <= lastTimeStamp);
+          String actualDevice = resultSet.getString(2);
+          if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          }
+          lastDevice = actualDevice;
+          lastTimeStamp = actualTimeStamp;
+          long actualPrecipitation = resultSet.getLong(3);
+          double actualTemperature = resultSet.getDouble(4);
+          assertEquals(
+              startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+          assertTrue(
+              startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+                  < 0.00001);
+          total++;
+        }
+        assertEquals(numOfPointsInDevice * places.length, total);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // aggregation query
+  private static final int[][] countIn1000MSFiledWith100MSTimeGap =
+      new int[][] {
+        {10, 10, 0},
+        {9, 10, 1},
+        {8, 10, 2},
+        {7, 10, 3},
+        {6, 10, 4},
+        {5, 10, 5},
+        {4, 10, 6},
+        {3, 10, 7},
+        {2, 10, 8},
+        {1, 10, 9}
+      };
+
+  private static int getCountNum(String device, int cnt) {
+    int index = 0;
+    for (int i = 0; i < places.length; i++) {
+      if (places[i].equals(device)) {
+        index = i;
+        break;
+      }
+    }
+
+    return countIn1000MSFiledWith100MSTimeGap[index][cnt];
+  }
+
+  @Test
+  public void groupByTimeOrderByDeviceTest1() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 0;
+        while (resultSet.next()) {
+
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedTime[cnt], actualTime);
+          if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+            assertTrue(actualTime >= lastTime);
+          if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          cnt++;
+          if (cnt % 3 == 0) cnt = 0;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByDeviceTest2() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 0;
+        while (resultSet.next()) {
+
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedTime[cnt], actualTime);
+          if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+            assertTrue(actualTime >= lastTime);
+          if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          cnt++;
+          if (cnt % 3 == 0) cnt = 0;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByDeviceTest3() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE DESC,TIME DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 2;
+        while (resultSet.next()) {
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedTime[cnt], actualTime);
+          if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+            assertTrue(actualTime <= lastTime);
+          if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          cnt--;
+          if (cnt < 0) cnt = 2;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByDeviceTest4() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE ASC,TIME DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 2;
+        while (resultSet.next()) {
+
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertEquals(expectedTime[cnt], actualTime);
+          if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+            assertTrue(actualTime <= lastTime);
+          if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          cnt--;
+          if (cnt < 0) cnt = 2;
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByTimeTest1() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      int index = 0;
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertTrue(actualTime >= lastTime);
+          if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          index++;
+          if (index % 10 == 0) cnt++;
+        }
+        assertEquals(30, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByTimeTest2() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = Long.MAX_VALUE;
+      String lastDevice = "";
+      int index = 0;
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 2;
+        while (resultSet.next()) {
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertTrue(actualTime <= lastTime);
+          if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+            assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          index++;
+          if (index % 10 == 0) cnt--;
+        }
+        assertEquals(30, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByTimeTest3() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME ASC,DEVICE DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = -1;
+      String lastDevice = "";
+      int index = 0;
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 0;
+        while (resultSet.next()) {
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertTrue(actualTime >= lastTime);
+          if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+            assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          index++;
+          if (index % 10 == 0) cnt++;
+        }
+        assertEquals(30, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void groupByTimeOrderByTimeTest4() {
+    String sql =
+        "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME DESC,DEVICE DESC ALIGN BY DEVICE";
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      long lastTime = Long.MAX_VALUE;
+      String lastDevice = "";
+      int index = 0;
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+        int cnt = 2;
+        while (resultSet.next()) {
+          long actualTime = resultSet.getLong(1);
+          String actualDevice = resultSet.getString(2);
+          assertTrue(actualTime <= lastTime);
+          if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+            assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+          lastTime = actualTime;
+          lastDevice = actualDevice;
+          double avgPrecipitation = resultSet.getDouble(3);
+          assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+          double avgTemperature = resultSet.getDouble(4);
+          assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+          int countPrecipitation = resultSet.getInt(5);
+          assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+          int countTemperature = resultSet.getInt(6);
+          assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+          long maxPrecipitation = resultSet.getLong(7);
+          assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+          double maxTemperature = resultSet.getDouble(8);
+          assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+          index++;
+          if (index % 10 == 0) cnt--;
+        }
+        assertEquals(30, index);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 6b066a5905..0346dcb0f8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -203,7 +203,7 @@ public class IoTDBNullValueFillIT {
           "1,root.sg1.d2,1,2,1.0,2.0,true,t2,"
         };
     resultSetEqualTest(
-        "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by time desc align by device",
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by device,time desc align by device",
         expectedAlignByDeviceHeader,
         retArray);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
new file mode 100644
index 0000000000..372d01cb23
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class MergeSortOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final List<Operator> inputOperators;
+  private final List<TSDataType> dataTypes;
+  private final TsBlockBuilder tsBlockBuilder;
+  private final int inputOperatorsCount;
+  private final TsBlock[] inputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+  private final MergeSortHeap mergeSortHeap;
+  private final Comparator<MergeSortKey> comparator;
+
+  private boolean finished;
+
+  public MergeSortOperator(
+      OperatorContext operatorContext,
+      List<Operator> inputOperators,
+      List<TSDataType> dataTypes,
+      Comparator<MergeSortKey> comparator) {
+    this.operatorContext = operatorContext;
+    this.inputOperators = inputOperators;
+    this.dataTypes = dataTypes;
+    this.inputOperatorsCount = inputOperators.size();
+    this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+    this.comparator = comparator;
+    this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+        ListenableFuture<?> blocked = inputOperators.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          listenableFutures.add(blocked);
+        }
+      }
+    }
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+  }
+
+  /** If the tsBlock is null or has no more data in the tsBlock, return true; else return false; */
+  private boolean isTsBlockEmpty(int tsBlockIndex) {
+    return inputTsBlocks[tsBlockIndex] == null
+        || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
+  }
+
+  @Override
+  public TsBlock next() {
+    // 1. fill consumed up TsBlock
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNext()) {
+        inputTsBlocks[i] = inputOperators.get(i).next();
+        if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
+          return null;
+        }
+        mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i));
+      }
+    }
+
+    // 2. check if we can directly return the original TsBlock instead of merging way
+    MergeSortKey minMergeSortKey = mergeSortHeap.poll();
+    if (mergeSortHeap.isEmpty()
+        || comparator.compare(
+                new MergeSortKey(
+                    minMergeSortKey.tsBlock, minMergeSortKey.tsBlock.getPositionCount() - 1),
+                mergeSortHeap.peek())
+            < 0) {
+      inputTsBlocks[minMergeSortKey.columnIndex] = null;
+      return minMergeSortKey.rowIndex == 0
+          ? minMergeSortKey.tsBlock
+          : minMergeSortKey.tsBlock.subTsBlock(minMergeSortKey.rowIndex);
+    }
+    mergeSortHeap.push(minMergeSortKey);
+
+    // 3. do merge sort until one TsBlock is consumed up
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    while (!mergeSortHeap.isEmpty()) {
+      MergeSortKey mergeSortKey = mergeSortHeap.poll();
+      TsBlock targetBlock = mergeSortKey.tsBlock;
+      int rowIndex = mergeSortKey.rowIndex;
+      timeBuilder.writeLong(targetBlock.getTimeByIndex(rowIndex));
+      for (int i = 0; i < valueColumnBuilders.length; i++) {
+        if (targetBlock.getColumn(i).isNull(rowIndex)) {
+          valueColumnBuilders[i].appendNull();
+          continue;
+        }
+        valueColumnBuilders[i].write(targetBlock.getColumn(i), rowIndex);
+      }
+      tsBlockBuilder.declarePosition();
+      if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() - 1) {
+        inputTsBlocks[mergeSortKey.columnIndex] = null;
+        if (!mergeSortHeap.isEmpty()
+            && comparator.compare(mergeSortHeap.peek(), mergeSortKey) > 0) {
+          break;
+        }
+      } else {
+        mergeSortKey.rowIndex++;
+        mergeSortHeap.push(mergeSortKey);
+      }
+    }
+    return tsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isTsBlockEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (inputOperators.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (Operator operator : inputOperators) {
+      operator.close();
+    }
+  }
+
+  @Override
+  public boolean isFinished() {
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    // MergeToolKit will cache startKey and endKey
+    long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+    for (Operator operator : inputOperators) {
+      maxPeekMemory += operator.calculateMaxReturnSize();
+      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    for (Operator operator : inputOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : inputOperators) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+      currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+    }
+    return currentRetainedSize - minChildReturnSize;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
new file mode 100644
index 0000000000..0107af8121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * The SingleDeviceViewOperator plays a similar role with DeviceViewOperator of adding a device
+ * column to current resultSet.
+ *
+ * <p>Different from DeviceViewOperator which merge the resultSet from different devices,
+ * SingleDeviceViewOperator only focuses on one single device, the goal of it is to add a device
+ * view. It's just a transition and won't change the way data flows.
+ */
+public class SingleDeviceViewOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final Operator deviceOperator;
+  // Used to fill columns and leave null columns which doesn't exist in some devices.
+  private final List<Integer> deviceColumnIndex;
+  // Column dataTypes that includes device column
+  private final List<TSDataType> dataTypes;
+  private final BinaryColumn binaryColumn;
+
+  public SingleDeviceViewOperator(
+      OperatorContext operatorContext,
+      String device,
+      Operator deviceOperator,
+      List<Integer> deviceColumnIndex,
+      List<TSDataType> dataTypes) {
+    this.operatorContext = operatorContext;
+    this.deviceOperator = deviceOperator;
+    this.deviceColumnIndex = deviceColumnIndex;
+    this.dataTypes = dataTypes;
+    this.binaryColumn = new BinaryColumn(1, Optional.empty(), new Binary[] {new Binary(device)});
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> blocked = deviceOperator.isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock tsBlock = deviceOperator.next();
+    if (tsBlock == null) {
+      return null;
+    }
+    // fill existing columns
+    Column[] newValueColumns = new Column[dataTypes.size()];
+    for (int i = 0; i < deviceColumnIndex.size(); i++) {
+      newValueColumns[deviceColumnIndex.get(i)] = tsBlock.getColumn(i);
+    }
+    // construct device column
+    newValueColumns[0] = new RunLengthEncodedColumn(binaryColumn, tsBlock.getPositionCount());
+    // construct other null columns
+    for (int i = 0; i < dataTypes.size(); i++) {
+      if (newValueColumns[i] == null) {
+        newValueColumns[i] = NullColumn.create(dataTypes.get(i), tsBlock.getPositionCount());
+      }
+    }
+    return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return deviceOperator.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    deviceOperator.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNext();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+    maxPeekMemory = Math.max(maxPeekMemory, deviceOperator.calculateMaxPeekMemory());
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return (long) (dataTypes.size())
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return deviceOperator.calculateRetainedSizeAfterCallingNext();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
new file mode 100644
index 0000000000..da17deed4e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
+
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class MergeSortComparator {
+
+  public static final Comparator<MergeSortKey> ASC_TIME_ASC_DEVICE =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int timeComparing =
+            (int) (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex));
+        return timeComparing == 0
+            ? o1.tsBlock
+                .getColumn(0)
+                .getBinary(o1.rowIndex)
+                .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex))
+            : timeComparing;
+      };
+  public static final Comparator<MergeSortKey> ASC_TIME_DESC_DEVICE =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int timeComparing =
+            (int) (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex));
+        return timeComparing == 0
+            ? o2.tsBlock
+                .getColumn(0)
+                .getBinary(o2.rowIndex)
+                .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex))
+            : timeComparing;
+      };
+  public static final Comparator<MergeSortKey> DESC_TIME_ASC_DEVICE =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int timeComparing =
+            (int) (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex));
+        return timeComparing == 0
+            ? o1.tsBlock
+                .getColumn(0)
+                .getBinary(o1.rowIndex)
+                .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex))
+            : timeComparing;
+      };
+  public static final Comparator<MergeSortKey> DESC_TIME_DESC_DEVICE =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int timeComparing =
+            (int) (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex));
+        return timeComparing == 0
+            ? o2.tsBlock
+                .getColumn(0)
+                .getBinary(o2.rowIndex)
+                .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex))
+            : timeComparing;
+      };
+
+  public static final Comparator<MergeSortKey> ASC_DEVICE_ASC_TIME =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int deviceComparing =
+            o1.tsBlock
+                .getColumn(0)
+                .getBinary(o1.rowIndex)
+                .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex));
+        return deviceComparing == 0
+            ? (int)
+                (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex))
+            : deviceComparing;
+      };
+  public static final Comparator<MergeSortKey> ASC_DEVICE_DESC_TIME =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int deviceComparing =
+            o1.tsBlock
+                .getColumn(0)
+                .getBinary(o1.rowIndex)
+                .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex));
+        return deviceComparing == 0
+            ? (int)
+                (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex))
+            : deviceComparing;
+      };
+  public static final Comparator<MergeSortKey> DESC_DEVICE_ASC_TIME =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int deviceComparing =
+            o2.tsBlock
+                .getColumn(0)
+                .getBinary(o2.rowIndex)
+                .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex));
+        return deviceComparing == 0
+            ? (int)
+                (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex))
+            : deviceComparing;
+      };
+  public static final Comparator<MergeSortKey> DESC_DEVICE_DESC_TIME =
+      (MergeSortKey o1, MergeSortKey o2) -> {
+        int deviceComparing =
+            o2.tsBlock
+                .getColumn(0)
+                .getBinary(o2.rowIndex)
+                .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex));
+        return deviceComparing == 0
+            ? (int)
+                (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex))
+            : deviceComparing;
+      };
+
+  public static Comparator<MergeSortKey> getComparator(List<SortItem> sortItemList) {
+    if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
+      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
+        else return ASC_DEVICE_ASC_TIME;
+      } else {
+        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
+        else return ASC_DEVICE_DESC_TIME;
+      }
+    } else {
+      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
+        else return DESC_DEVICE_ASC_TIME;
+      } else {
+        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
+        else return DESC_DEVICE_DESC_TIME;
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index e2497d44c9..351303134f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -55,6 +56,8 @@ public class LocalExecutionPlanContext {
 
   private final TypeProvider typeProvider;
 
+  private List<TSDataType> cachedDataTypes;
+
   // left is cached last value in last query
   // right is full path for each cached last value
   private List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList;
@@ -147,6 +150,14 @@ public class LocalExecutionPlanContext {
     this.sinkHandle = sinkHandle;
   }
 
+  public void setCachedDataTypes(List<TSDataType> cachedDataTypes) {
+    this.cachedDataTypes = cachedDataTypes;
+  }
+
+  public List<TSDataType> getCachedDataTypes() {
+    return cachedDataTypes;
+  }
+
   public TypeProvider getTypeProvider() {
     return typeProvider;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a04f877064..3a7f1eeb6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -60,7 +60,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -516,32 +518,73 @@ public class LogicalPlanBuilder {
       Map<String, PlanNode> deviceNameToSourceNodesMap,
       Set<Expression> deviceViewOutputExpressions,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap,
-      Ordering mergeOrder) {
+      List<SortItem> sortItemList) {
     List<String> outputColumnNames =
         deviceViewOutputExpressions.stream()
             .map(Expression::getExpressionString)
             .collect(Collectors.toList());
 
-    DeviceViewNode deviceViewNode =
-        new DeviceViewNode(
-            context.getQueryId().genPlanNodeId(),
-            new OrderByParameter(
-                Arrays.asList(
-                    new SortItem(SortKey.DEVICE, Ordering.ASC),
-                    new SortItem(SortKey.TIME, mergeOrder))),
-            outputColumnNames,
-            deviceToMeasurementIndexesMap);
-
-    for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
-      String deviceName = entry.getKey();
-      PlanNode subPlan = entry.getValue();
-      deviceViewNode.addChildDeviceNode(deviceName, subPlan);
+    int timePriority = -1, devicePriority = -1;
+    for (int i = 0; i < sortItemList.size(); i++) {
+      SortKey sortKey = sortItemList.get(i).getSortKey();
+      if (sortKey == SortKey.TIME) {
+        timePriority = sortItemList.size() - i;
+      } else if (sortKey == SortKey.DEVICE) {
+        devicePriority = sortItemList.size() - i;
+      }
+    }
+    Ordering deviceOrdering =
+        devicePriority == -1
+            ? Ordering.ASC
+            : sortItemList.get(sortItemList.size() - devicePriority).getOrdering();
+    Ordering timeOrdering =
+        timePriority == -1
+            ? Ordering.ASC
+            : sortItemList.get(sortItemList.size() - timePriority).getOrdering();
+
+    if ((timePriority == -1 && devicePriority == -1) || devicePriority > timePriority) {
+      DeviceViewNode deviceViewNode =
+          new DeviceViewNode(
+              context.getQueryId().genPlanNodeId(),
+              new OrderByParameter(
+                  Arrays.asList(
+                      new SortItem(SortKey.DEVICE, deviceOrdering),
+                      new SortItem(SortKey.TIME, timeOrdering))),
+              outputColumnNames,
+              deviceToMeasurementIndexesMap);
+
+      for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
+        String deviceName = entry.getKey();
+        PlanNode subPlan = entry.getValue();
+        deviceViewNode.addChildDeviceNode(deviceName, subPlan);
+      }
+      this.root = deviceViewNode;
+    } else {
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.getQueryId().genPlanNodeId(),
+              new OrderByParameter(
+                  Arrays.asList(
+                      new SortItem(SortKey.TIME, timeOrdering),
+                      new SortItem(SortKey.DEVICE, deviceOrdering))),
+              outputColumnNames);
+      for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
+        String deviceName = entry.getKey();
+        PlanNode subPlan = entry.getValue();
+        SingleDeviceViewNode singleDeviceViewNode =
+            new SingleDeviceViewNode(
+                context.getQueryId().genPlanNodeId(),
+                outputColumnNames,
+                deviceName,
+                deviceToMeasurementIndexesMap.get(deviceName));
+        singleDeviceViewNode.addChild(subPlan);
+        mergeSortNode.addChild(singleDeviceViewNode);
+      }
+      this.root = mergeSortNode;
     }
 
     context.getTypeProvider().setType(DEVICE, TSDataType.TEXT);
     updateTypeProvider(deviceViewOutputExpressions);
-
-    this.root = deviceViewNode;
     return this;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..d9a458b4c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplate
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -104,7 +106,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     }
 
     if (queryStatement.isAlignByDevice()) {
-      Map<String, PlanNode> deviceToSubPlanMap = new TreeMap<>();
+      Map<String, PlanNode> deviceToSubPlanMap =
+          queryStatement.getResultDeviceOrder() == Ordering.ASC
+              ? new TreeMap<>()
+              : new TreeMap<>(Collections.reverseOrder());
       for (String deviceName : analysis.getDeviceToSourceExpressions().keySet()) {
         LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, context);
         subPlanBuilder =
@@ -127,7 +132,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
               deviceToSubPlanMap,
               analysis.getDeviceViewOutputExpressions(),
               analysis.getDeviceViewInputIndexesMap(),
-              queryStatement.getResultTimeOrder());
+              queryStatement.getSortItemList());
     } else {
       planBuilder =
           planBuilder.withNewRoot(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1a22949343..2af0bd0708 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -48,9 +48,11 @@ import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
 import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
@@ -80,6 +82,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatO
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
@@ -146,8 +149,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -646,6 +651,27 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return new NodePathsCountOperator(operatorContext, child);
   }
 
+  @Override
+  public Operator visitSingleDeviceView(
+      SingleDeviceViewNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                SingleDeviceViewOperator.class.getSimpleName());
+    Operator child = node.getChild().accept(this, context);
+    List<Integer> deviceColumnIndex = node.getDeviceToMeasurementIndexes();
+    List<TSDataType> outputColumnTypes = context.getCachedDataTypes();
+    if (outputColumnTypes == null || outputColumnTypes.size() == 0) {
+      throw new IllegalStateException("OutputColumTypes should not be null/empty");
+    }
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new SingleDeviceViewOperator(
+        operatorContext, node.getDevice(), child, deviceColumnIndex, outputColumnTypes);
+  }
+
   @Override
   public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
@@ -705,6 +731,28 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         operatorContext, node.getDevices(), children, dataTypes, selector, timeComparator);
   }
 
+  @Override
+  public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                MergeSortOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+    context.setCachedDataTypes(dataTypes);
+    List<Operator> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(Collectors.toList());
+
+    List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new MergeSortOperator(
+        operatorContext, children, dataTypes, MergeSortComparator.getComparator(sortItemList));
+  }
+
   @Override
   public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
     Operator child = node.getChild().accept(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 7aafb9198c..965dcb1b56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -51,7 +51,13 @@ public class DistributionPlanner {
 
   public PlanNode rewriteSource() {
     SourceRewriter rewriter = new SourceRewriter(this.analysis);
-    return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
+    List<PlanNode> planNodeList =
+        rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
+    if (planNodeList.size() != 1) {
+      throw new IllegalStateException("root node must return only one");
+    } else {
+      return planNodeList.get(0);
+    }
   }
 
   public PlanNode addExchangeNode(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index a9590d995a..82b56565bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -39,7 +39,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.ArrayList;
@@ -207,6 +210,83 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitSingleDeviceView(SingleDeviceViewNode node, NodeGroupContext context) {
+    return processOneChildNode(node, context);
+  }
+
+  @Override
+  public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) {
+    // 1. Group children by dataRegion
+    Map<TRegionReplicaSet, List<PlanNode>> childrenGroupMap = new HashMap<>();
+    for (int i = 0; i < node.getChildren().size(); i++) {
+      PlanNode rawChildNode = node.getChildren().get(i);
+      PlanNode visitedChild = visit(rawChildNode, context);
+      TRegionReplicaSet region = context.getNodeDistribution(visitedChild.getPlanNodeId()).region;
+      childrenGroupMap.computeIfAbsent(region, k -> new ArrayList<>()).add(visitedChild);
+    }
+
+    // 2.add mergeSortNode for each group
+    List<PlanNode> mergeSortNodeList = new ArrayList<>();
+    for (List<PlanNode> group : childrenGroupMap.values()) {
+      if (group.size() == 1) {
+        mergeSortNodeList.add(group.get(0));
+        continue;
+      }
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrderParameter(),
+              node.getOutputColumnNames());
+      group.forEach(mergeSortNode::addChild);
+      context.putNodeDistribution(
+          mergeSortNode.getPlanNodeId(),
+          new NodeDistribution(
+              NodeDistributionType.SAME_WITH_ALL_CHILDREN,
+              context.getNodeDistribution(mergeSortNode.getChildren().get(0).getPlanNodeId())
+                  .region));
+      mergeSortNodeList.add(mergeSortNode);
+    }
+
+    return groupPlanNodeByMergeSortNode(
+        mergeSortNodeList, node.getOutputColumnNames(), node.getMergeOrderParameter(), context);
+  }
+
+  private PlanNode groupPlanNodeByMergeSortNode(
+      List<PlanNode> mergeSortNodeList,
+      List<String> outputColumns,
+      OrderByParameter orderByParameter,
+      NodeGroupContext context) {
+    if (mergeSortNodeList.size() == 1) {
+      return mergeSortNodeList.get(0);
+    }
+
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            context.queryContext.getQueryId().genPlanNodeId(), orderByParameter, outputColumns);
+
+    // Each child has different TRegionReplicaSet, so we can select any one from
+    // its child
+    mergeSortNode.addChild(mergeSortNodeList.get(0));
+    context.putNodeDistribution(
+        mergeSortNode.getPlanNodeId(),
+        new NodeDistribution(
+            NodeDistributionType.SAME_WITH_SOME_CHILD,
+            context.getNodeDistribution(mergeSortNodeList.get(0).getPlanNodeId()).region));
+
+    // add ExchangeNode for other child
+    for (int i = 1; i < mergeSortNodeList.size(); i++) {
+      PlanNode child = mergeSortNodeList.get(i);
+      ExchangeNode exchangeNode =
+          new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+      exchangeNode.setChild(child);
+      exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+      mergeSortNode.addChild(exchangeNode);
+    }
+
+    return mergeSortNode;
+  }
+
   @Override
   public PlanNode visitLastQueryMerge(LastQueryMergeNode node, NodeGroupContext context) {
     return processMultiChildNode(node, context);
@@ -274,9 +354,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
       DeviceViewGroup group = deviceViewGroupMap.computeIfAbsent(region, DeviceViewGroup::new);
       group.addChild(device, visitedChild);
     }
-
     // Generate DeviceViewNode for each group
-    List<DeviceViewNode> deviceViewNodeList = new ArrayList<>();
+    List<PlanNode> deviceViewNodeList = new ArrayList<>();
     for (DeviceViewGroup group : deviceViewGroupMap.values()) {
       DeviceViewNode deviceViewNode =
           new DeviceViewNode(
@@ -296,35 +375,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
       deviceViewNodeList.add(deviceViewNode);
     }
 
-    if (deviceViewNodeList.size() == 1) {
-      return deviceViewNodeList.get(0);
-    }
-
-    DeviceMergeNode deviceMergeNode =
-        new DeviceMergeNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getDevices());
-
-    // Each child of deviceMergeNode has different TRegionReplicaSet, so we can select any one from
-    // its child
-    deviceMergeNode.addChild(deviceViewNodeList.get(0));
-    context.putNodeDistribution(
-        deviceMergeNode.getPlanNodeId(),
-        new NodeDistribution(
-            NodeDistributionType.SAME_WITH_SOME_CHILD,
-            context.getNodeDistribution(deviceViewNodeList.get(0).getPlanNodeId()).region));
-
-    // Add ExchangeNode for any other child except first one
-    for (int i = 1; i < deviceViewNodeList.size(); i++) {
-      PlanNode child = deviceViewNodeList.get(i);
-      ExchangeNode exchangeNode =
-          new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-      exchangeNode.setChild(child);
-      exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-      deviceMergeNode.addChild(exchangeNode);
-    }
-    return deviceMergeNode;
+    return groupPlanNodeByMergeSortNode(
+        deviceViewNodeList, node.getOutputColumnNames(), node.getMergeOrderParameter(), context);
   }
 
   private static class DeviceViewGroup {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 42ace5e178..20d703a945 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -36,11 +36,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
@@ -85,7 +86,65 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitMergeSort(MergeSortNode node, DistributionPlanContext context) {
+    MergeSortNode newRoot = cloneMergeSortNodeWithoutChild(node, context);
+    for (int i = 0; i < node.getChildren().size(); i++) {
+      List<PlanNode> rewroteNodes = rewrite(node.getChildren().get(i), context);
+      rewroteNodes.forEach(newRoot::addChild);
+    }
+    return Collections.singletonList(newRoot);
+  }
+
+  private MergeSortNode cloneMergeSortNodeWithoutChild(
+      MergeSortNode node, DistributionPlanContext context) {
+    return new MergeSortNode(
+        context.queryContext.getQueryId().genPlanNodeId(),
+        node.getMergeOrderParameter(),
+        node.getOutputColumnNames());
+  }
+
+  @Override
+  public List<PlanNode> visitSingleDeviceView(
+      SingleDeviceViewNode node, DistributionPlanContext context) {
+
+    if (isAggregationQuery()) {
+      List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
+      if (rewroteChildren.size() != 1) {
+        throw new IllegalStateException("SingleDeviceViewNode have only one child");
+      }
+      node.setChild(rewroteChildren.get(0));
+      return Collections.singletonList(node);
+    }
+
+    String device = node.getDevice();
+    List<TRegionReplicaSet> regionReplicaSets =
+        analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
+
+    List<PlanNode> singleDeviceViewList = new ArrayList<>();
+    for (TRegionReplicaSet tRegionReplicaSet : regionReplicaSets) {
+      singleDeviceViewList.add(
+          buildSingleDeviceViewNodeInRegion(node, tRegionReplicaSet, context.queryContext));
+    }
+
+    return singleDeviceViewList;
+  }
+
+  private PlanNode buildSingleDeviceViewNodeInRegion(
+      PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+    List<PlanNode> children =
+        root.getChildren().stream()
+            .map(child -> buildSingleDeviceViewNodeInRegion(child, regionReplicaSet, context))
+            .collect(Collectors.toList());
+    PlanNode newRoot = root.cloneWithChildren(children);
+    newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+    if (newRoot instanceof SourceNode) {
+      ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
+    }
+    return newRoot;
+  }
+
+  @Override
+  public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
     checkArgument(
         node.getDevices().size() == node.getChildren().size(),
         "size of devices and its children in DeviceViewNode should be same");
@@ -109,13 +168,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       relatedDataRegions.addAll(regionReplicaSets);
     }
 
-    DeviceMergeNode deviceMergeNode =
-        new DeviceMergeNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getDevices());
-
     // Step 2: Iterate all partition and create DeviceViewNode for each region
+    List<PlanNode> deviceViewNodeList = new ArrayList<>();
     for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
       List<String> devices = new ArrayList<>();
       List<PlanNode> children = new ArrayList<>();
@@ -129,20 +183,34 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       for (int i = 0; i < devices.size(); i++) {
         regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
       }
-      deviceMergeNode.addChild(regionDeviceViewNode);
+      deviceViewNodeList.add(regionDeviceViewNode);
     }
 
-    return deviceMergeNode;
+    if (deviceViewNodeList.size() == 1) {
+      return deviceViewNodeList;
+    }
+
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getMergeOrderParameter(),
+            node.getOutputColumnNames());
+    for (PlanNode deviceViewNode : deviceViewNodeList) {
+      mergeSortNode.addChild(deviceViewNode);
+    }
+    return Collections.singletonList(mergeSortNode);
   }
 
-  private PlanNode processDeviceViewWithAggregation(
+  private List<PlanNode> processDeviceViewWithAggregation(
       DeviceViewNode node, DistributionPlanContext context) {
     DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context);
     for (int i = 0; i < node.getDevices().size(); i++) {
-      newRoot.addChildDeviceNode(
-          node.getDevices().get(i), rewrite(node.getChildren().get(i), context));
+      List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context);
+      for (PlanNode planNode : rewroteNode) {
+        newRoot.addChildDeviceNode(node.getDevices().get(i), planNode);
+      }
     }
-    return newRoot;
+    return Collections.singletonList(newRoot);
   }
 
   private DeviceViewNode cloneDeviceViewNodeWithoutChild(
@@ -192,7 +260,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitSchemaQueryMerge(
+  public List<PlanNode> visitSchemaQueryMerge(
       SchemaQueryMergeNode node, DistributionPlanContext context) {
     SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
     SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
@@ -263,11 +331,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
                 });
           });
     }
-    return root;
+    return Collections.singletonList(root);
   }
 
   @Override
-  public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitCountMerge(
+      CountSchemaMergeNode node, DistributionPlanContext context) {
     CountSchemaMergeNode root = (CountSchemaMergeNode) node.clone();
     SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
     Set<TRegionReplicaSet> schemaRegions = new HashSet<>();
@@ -287,19 +356,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           schemaQueryScanNode.setRegionReplicaSet(region);
           root.addChild(schemaQueryScanNode);
         });
-    return root;
+    return Collections.singletonList(root);
   }
 
   // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
   @Override
-  public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
     return processRawSeriesScan(node, context, timeJoinNode);
   }
 
   @Override
-  public PlanNode visitAlignedSeriesScan(
+  public List<PlanNode> visitAlignedSeriesScan(
       AlignedSeriesScanNode node, DistributionPlanContext context) {
     TimeJoinNode timeJoinNode =
         new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
@@ -307,7 +376,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitLastQueryScan(
+      LastQueryScanNode node, DistributionPlanContext context) {
     LastQueryNode mergeNode =
         new LastQueryNode(
             context.queryContext.getQueryId().genPlanNodeId(),
@@ -317,7 +387,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitAlignedLastQueryScan(
+  public List<PlanNode> visitAlignedLastQueryScan(
       AlignedLastQueryScanNode node, DistributionPlanContext context) {
     LastQueryNode mergeNode =
         new LastQueryNode(
@@ -327,19 +397,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return processRawSeriesScan(node, context, mergeNode);
   }
 
-  private PlanNode processRawSeriesScan(
+  private List<PlanNode> processRawSeriesScan(
       SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) {
-    List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
+    List<PlanNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
     if (sourceNodes.size() == 1) {
-      return sourceNodes.get(0);
+      return sourceNodes;
     }
     sourceNodes.forEach(parent::addChild);
-    return parent;
+    return Collections.singletonList(parent);
   }
 
-  private List<SeriesSourceNode> splitSeriesSourceNodeByPartition(
+  private List<PlanNode> splitSeriesSourceNodeByPartition(
       SeriesSourceNode node, DistributionPlanContext context) {
-    List<SeriesSourceNode> ret = new ArrayList<>();
+    List<PlanNode> ret = new ArrayList<>();
     List<TRegionReplicaSet> dataDistribution =
         analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
     if (dataDistribution.size() == 1) {
@@ -358,24 +428,24 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitSeriesAggregationScan(
+  public List<PlanNode> visitSeriesAggregationScan(
       SeriesAggregationScanNode node, DistributionPlanContext context) {
     return processSeriesAggregationSource(node, context);
   }
 
   @Override
-  public PlanNode visitAlignedSeriesAggregationScan(
+  public List<PlanNode> visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, DistributionPlanContext context) {
     return processSeriesAggregationSource(node, context);
   }
 
-  private PlanNode processSeriesAggregationSource(
+  private List<PlanNode> processSeriesAggregationSource(
       SeriesAggregationSourceNode node, DistributionPlanContext context) {
     List<TRegionReplicaSet> dataDistribution =
         analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
     if (dataDistribution.size() == 1) {
       node.setRegionReplicaSet(dataDistribution.get(0));
-      return node;
+      return Collections.singletonList(node);
     }
     List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
@@ -415,11 +485,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
       split.setRegionReplicaSet(dataRegion);
       aggregationNode.addChild(split);
     }
-    return aggregationNode;
+    return Collections.singletonList(aggregationNode);
   }
 
   @Override
-  public PlanNode visitSchemaFetchMerge(
+  public List<PlanNode> visitSchemaFetchMerge(
       SchemaFetchMergeNode node, DistributionPlanContext context) {
     SchemaFetchMergeNode root = (SchemaFetchMergeNode) node.clone();
     Map<String, Set<TRegionReplicaSet>> storageGroupSchemaRegionMap = new HashMap<>();
@@ -444,11 +514,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
         root.addChild(schemaFetchScanNode);
       }
     }
-    return root;
+    return Collections.singletonList(root);
   }
 
   @Override
-  public PlanNode visitLastQuery(LastQueryNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitLastQuery(LastQueryNode node, DistributionPlanContext context) {
     // For last query, we need to keep every FI's root node is LastQueryMergeNode. So we
     // force every region group have a parent node even if there is only 1 child for it.
     context.setForceAddParent(true);
@@ -456,9 +526,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
       root.getChildren().forEach(newRoot::addChild);
-      return newRoot;
+      return Collections.singletonList(newRoot);
     } else {
-      return root;
+      return Collections.singletonList(root);
     }
   }
 
@@ -471,14 +541,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   @Override
-  public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
+  public List<PlanNode> visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
     // Although some logic is similar between Aggregation and RawDataQuery,
     // we still use separate method to process the distribution planning now
     // to make the planning procedure more clear
     if (containsAggregationSource(node)) {
       return planAggregationWithTimeJoin(node, context);
     }
-    return processRawMultiChildNode(node, context);
+    return Collections.singletonList(processRawMultiChildNode(node, context));
   }
 
   private PlanNode processRawMultiChildNode(
@@ -557,7 +627,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
         // In a general logical query plan, the children of TimeJoinNode should only be
         // SeriesScanNode or SeriesAggregateScanNode
         // So this branch should not be touched.
-        root.addChild(visit(child, context));
+        List<PlanNode> children = visit(child, context);
+        children.forEach(root::addChild);
       }
     }
     return root;
@@ -579,16 +650,17 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
   // This method is only used to process the PlanNodeTree whose root is SlidingWindowAggregationNode
   @Override
-  public PlanNode visitSlidingWindowAggregation(
+  public List<PlanNode> visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, DistributionPlanContext context) {
     DistributionPlanContext childContext = context.copy().setRoot(false);
-    PlanNode child = visit(node.getChild(), childContext);
+    List<PlanNode> children = visit(node.getChild(), childContext);
     PlanNode newRoot = node.clone();
-    newRoot.addChild(child);
-    return newRoot;
+    children.forEach(newRoot::addChild);
+    return Collections.singletonList(newRoot);
   }
 
-  private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
+  private List<PlanNode> planAggregationWithTimeJoin(
+      TimeJoinNode root, DistributionPlanContext context) {
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup;
 
     // construct newRoot
@@ -676,11 +748,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           }
         });
 
-    return newRoot;
+    return Collections.singletonList(newRoot);
   }
 
   @Override
-  public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+  public List<PlanNode> visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
     if (shouldUseNaiveAggregation(root)) {
       return defaultRewrite(root, context);
     }
@@ -703,11 +775,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
-    return newRoot;
+    return Collections.singletonList(newRoot);
   }
 
   @Override
-  public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
+  public List<PlanNode> visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
     Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
         splitAggregationSourceByPartition(root, context);
 
@@ -716,10 +788,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
             && root.getChildren().get(0) instanceof SlidingWindowAggregationNode;
 
     // TODO: use 2 phase aggregation to optimize the query
-    return containsSlidingWindow
-        ? groupSourcesForGroupByTagWithSlidingWindow(
-            root, (SlidingWindowAggregationNode) root.getChildren().get(0), sourceGroup, context)
-        : groupSourcesForGroupByTag(root, sourceGroup, context);
+    return Collections.singletonList(
+        containsSlidingWindow
+            ? groupSourcesForGroupByTagWithSlidingWindow(
+                root,
+                (SlidingWindowAggregationNode) root.getChildren().get(0),
+                sourceGroup,
+                context)
+            : groupSourcesForGroupByTag(root, sourceGroup, context));
   }
 
   // If the Aggregation Query contains value filter, we need to use the naive query plan
@@ -1040,7 +1116,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return ret;
   }
 
-  public PlanNode visit(PlanNode node, DistributionPlanContext context) {
+  public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 1ea9a94440..58bbe7b371 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.commons.lang3.Validate;
@@ -145,6 +148,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitSingleDeviceView(SingleDeviceViewNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("SingleDeviceView-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("DeviceName: %s", node.getDevice()));
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitDeviceView(DeviceViewNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
@@ -153,6 +164,20 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitMergeSort(MergeSortNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("MergeSort-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("ChildrenCount: %d", node.getChildren().size()));
+    StringBuilder sortInfo = new StringBuilder("Order:");
+    for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
+      sortInfo.append(" ");
+      sortInfo.append(sortItem.getSortKey()).append(" ").append(sortItem.getOrdering());
+    }
+    boxValue.add(sortInfo.toString());
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitDeviceMerge(DeviceMergeNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 0f846ad25b..70999dc816 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -58,8 +58,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -152,7 +154,9 @@ public enum PlanNodeType {
   DEACTIVATE_TEMPLATE_NODE((short) 61),
   INTO((short) 62),
   DEVICE_VIEW_INTO((short) 63),
-  VERTICALLY_CONCAT((short) 64);
+  VERTICALLY_CONCAT((short) 64),
+  SINGLE_DEVICE_VIEW((short) 65),
+  MERGE_SORT((short) 66);
 
   public static final int BYTES = Short.BYTES;
 
@@ -331,6 +335,10 @@ public enum PlanNodeType {
         return DeviceViewIntoNode.deserialize(buffer);
       case 64:
         return VerticallyConcatNode.deserialize(buffer);
+      case 65:
+        return SingleDeviceViewNode.deserialize(buffer);
+      case 66:
+        return MergeSortNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8bf41c86ec..f7eb7d5704 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -56,8 +56,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -328,6 +330,14 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitMergeSort(MergeSortNode node, C context) {
+    return visitPlan(node, context);
+  }
+
   public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
index d7d77ac3ee..2737a22021 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
@@ -19,30 +19,34 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
+import java.util.Collections;
 import java.util.List;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
+public class SimplePlanNodeRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
   @Override
-  public PlanNode visitPlan(PlanNode node, C context) {
+  public List<PlanNode> visitPlan(PlanNode node, C context) {
     // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
     if (node instanceof WritePlanNode) {
-      return node;
+      return Collections.singletonList(node);
     }
     return defaultRewrite(node, context);
   }
 
-  public PlanNode defaultRewrite(PlanNode node, C context) {
-    List<PlanNode> children =
+  public List<PlanNode> defaultRewrite(PlanNode node, C context) {
+    List<List<PlanNode>> children =
         node.getChildren().stream()
             .map(child -> rewrite(child, context))
             .collect(toImmutableList());
-
-    return node.cloneWithChildren(children);
+    PlanNode newNode = node.clone();
+    for (List<PlanNode> planNodes : children) {
+      planNodes.forEach(newNode::addChild);
+    }
+    return Collections.singletonList(newNode);
   }
 
-  public PlanNode rewrite(PlanNode node, C userContext) {
+  public List<PlanNode> rewrite(PlanNode node, C userContext) {
     return node.accept(this, userContext);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
new file mode 100644
index 0000000000..564a5dbfa4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class MergeSortNode extends MultiChildProcessNode {
+
+  private final OrderByParameter mergeOrderParameter;
+
+  private final List<String> outputColumns;
+
+  public MergeSortNode(
+      PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> outputColumns) {
+    super(id);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumns = outputColumns;
+  }
+
+  public OrderByParameter getMergeOrderParameter() {
+    return mergeOrderParameter;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), outputColumns);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return outputColumns;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitMergeSort(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.MERGE_SORT.serialize(byteBuffer);
+    mergeOrderParameter.serializeAttributes(byteBuffer);
+    ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+    for (String column : outputColumns) {
+      ReadWriteIOUtils.write(column, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.MERGE_SORT.serialize(stream);
+    mergeOrderParameter.serializeAttributes(stream);
+    ReadWriteIOUtils.write(outputColumns.size(), stream);
+    for (String column : outputColumns) {
+      ReadWriteIOUtils.write(column, stream);
+    }
+  }
+
+  public static MergeSortNode deserialize(ByteBuffer byteBuffer) {
+    OrderByParameter orderByParameter = OrderByParameter.deserialize(byteBuffer);
+    int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> outputColumns = new ArrayList<>();
+    while (columnSize > 0) {
+      outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
+      columnSize--;
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new MergeSortNode(planNodeId, orderByParameter, outputColumns);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    MergeSortNode that = (MergeSortNode) o;
+    return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), mergeOrderParameter);
+  }
+
+  @Override
+  public String toString() {
+    return "MergeSort-" + this.getPlanNodeId();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java
new file mode 100644
index 0000000000..caefda0455
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class SingleDeviceViewNode extends SingleChildProcessNode {
+
+  private final String device;
+
+  // To reduce memory cost, SingleDeviceViewNode doesn't serialize and deserialize
+  // outputColumnNames.
+  // It just rebuilds using the infos from parent node.
+  private List<String> outputColumnNames;
+  private final List<Integer> deviceToMeasurementIndexes;
+
+  public SingleDeviceViewNode(
+      PlanNodeId id,
+      List<String> outputColumnNames,
+      String device,
+      List<Integer> deviceToMeasurementIndexes) {
+    super(id);
+    this.device = device;
+    this.outputColumnNames = outputColumnNames;
+    this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
+  }
+
+  public SingleDeviceViewNode(
+      PlanNodeId id, String device, List<Integer> deviceToMeasurementIndexes) {
+    super(id);
+    this.device = device;
+    this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new SingleDeviceViewNode(
+        getPlanNodeId(), outputColumnNames, device, deviceToMeasurementIndexes);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return outputColumnNames;
+  }
+
+  public String getDevice() {
+    return device;
+  }
+
+  public List<Integer> getDeviceToMeasurementIndexes() {
+    return deviceToMeasurementIndexes;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitSingleDeviceView(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.SINGLE_DEVICE_VIEW.serialize(byteBuffer);
+    ReadWriteIOUtils.write(device, byteBuffer);
+    ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), byteBuffer);
+    for (Integer index : deviceToMeasurementIndexes) {
+      ReadWriteIOUtils.write(index, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.SINGLE_DEVICE_VIEW.serialize(stream);
+    ReadWriteIOUtils.write(device, stream);
+    ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), stream);
+    for (Integer index : deviceToMeasurementIndexes) {
+      ReadWriteIOUtils.write(index, stream);
+    }
+  }
+
+  public static SingleDeviceViewNode deserialize(ByteBuffer byteBuffer) {
+    String device = ReadWriteIOUtils.readString(byteBuffer);
+    int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Integer> deviceToMeasurementIndexes = new ArrayList<>(listSize);
+    while (listSize > 0) {
+      deviceToMeasurementIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+      listSize--;
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new SingleDeviceViewNode(planNodeId, device, deviceToMeasurementIndexes);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || o.getClass() != getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    SingleDeviceViewNode that = (SingleDeviceViewNode) o;
+    return device.equals(that.device)
+        && outputColumnNames.equals(that.outputColumnNames)
+        && deviceToMeasurementIndexes.equals(that.deviceToMeasurementIndexes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), device, outputColumnNames, deviceToMeasurementIndexes);
+  }
+
+  @Override
+  public String toString() {
+    return "SingleDeviceView-" + this.getPlanNodeId();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
index 3c1d10220a..a3784773ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
@@ -46,7 +46,6 @@ public class OrderByComponent extends StatementNode {
 
   public void addSortItem(SortItem sortItem) {
     this.sortItemList.add(sortItem);
-
     if (sortItem.getSortKey() == SortKey.TIME) {
       orderByTime = true;
       timeOrderPriority = sortItemList.size() - 1;
@@ -86,7 +85,7 @@ public class OrderByComponent extends StatementNode {
   }
 
   public Ordering getDeviceOrder() {
-    checkState(timeOrderPriority != -1, "The device order is not specified.");
+    checkState(deviceOrderPriority != -1, "The device order is not specified.");
     return sortItemList.get(deviceOrderPriority).getOrdering();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 049c955829..f8c84f38e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -299,6 +299,13 @@ public class QueryStatement extends Statement {
     return orderByComponent.getTimeOrder();
   }
 
+  public Ordering getResultDeviceOrder() {
+    if (orderByComponent == null || !orderByComponent.isOrderByDevice()) {
+      return Ordering.ASC;
+    }
+    return orderByComponent.getDeviceOrder();
+  }
+
   public List<SortItem> getSortItemList() {
     if (orderByComponent == null) {
       return Collections.emptyList();
@@ -398,10 +405,6 @@ public class QueryStatement extends Statement {
       if (isOrderByTimeseries()) {
         throw new SemanticException("Sorting by timeseries is only supported in last queries.");
       }
-      if (isOrderByDevice()) {
-        // TODO support sort by device
-        throw new SemanticException("Sorting by device is not yet supported.");
-      }
     }
 
     if (isLastQuery()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
new file mode 100644
index 0000000000..f80c9c8eb2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class MergeSortHeap {
+  private final MergeSortKey[] heap;
+  private int heapSize;
+
+  private final Comparator<MergeSortKey> comparator;
+
+  public MergeSortHeap(int childNum, Comparator<MergeSortKey> comparator) {
+    this.heap = new MergeSortKey[childNum];
+    this.heapSize = 0;
+    this.comparator = comparator;
+  }
+
+  public boolean isEmpty() {
+    return heapSize == 0;
+  }
+
+  public void push(MergeSortKey mergeSortKey) {
+    if (heapSize == 0) {
+      heap[0] = mergeSortKey;
+    }
+    shiftUp(heapSize, mergeSortKey);
+    ++heapSize;
+  }
+
+  public MergeSortKey poll() {
+    MergeSortKey res = heap[0];
+    heap[0] = heap[heapSize - 1];
+    shiftDown(0, heap[0]);
+    heapSize--;
+    return res;
+  }
+
+  public MergeSortKey peek() {
+    return heap[0];
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(heap);
+  }
+
+  private int getSmallerChildIndex(int index) {
+    final int leftChildIndex = (index << 1) + 1;
+    final int rightChildIndex = (index << 1) + 2;
+
+    int smallerChildIndex;
+    if (heapSize <= leftChildIndex) {
+      smallerChildIndex = -1;
+    } else if (heapSize <= rightChildIndex) {
+      smallerChildIndex = leftChildIndex;
+    } else {
+      smallerChildIndex =
+          comparator.compare(heap[leftChildIndex], heap[rightChildIndex]) > 0
+              ? rightChildIndex
+              : leftChildIndex;
+    }
+    return smallerChildIndex;
+  }
+
+  private void shiftDown(int parentIndex, MergeSortKey parent) {
+    if (parentIndex == heapSize - 1) return;
+
+    int childIndex = getSmallerChildIndex(parentIndex);
+
+    if (childIndex != -1) {
+      MergeSortKey child = heap[childIndex];
+      if (comparator.compare(parent, child) > 0) {
+        heap[parentIndex] = child;
+        heap[childIndex] = parent;
+        shiftDown(childIndex, parent);
+      }
+    }
+  }
+
+  private void shiftUp(int childIndex, MergeSortKey child) {
+    if (childIndex == 0) return;
+
+    int parentIndex = (childIndex - 1) >>> 1;
+    MergeSortKey parent = heap[parentIndex];
+
+    if (comparator.compare(parent, child) > 0) {
+      heap[parentIndex] = child;
+      heap[childIndex] = parent;
+      shiftUp(parentIndex, child);
+    } else {
+      heap[childIndex] = child;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
new file mode 100644
index 0000000000..542501d51c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class MergeSortKey {
+
+  public TsBlock tsBlock;
+  public int rowIndex;
+
+  public int columnIndex;
+
+  public MergeSortKey(TsBlock tsBlock, int rowIndex) {
+    this.tsBlock = tsBlock;
+    this.rowIndex = rowIndex;
+  }
+
+  public MergeSortKey(TsBlock tsBlock, int rowIndex, int columnIndex) {
+    this.tsBlock = tsBlock;
+    this.rowIndex = rowIndex;
+    this.columnIndex = columnIndex;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
new file mode 100644
index 0000000000..cd5831502e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -0,0 +1,1475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MergeSortOperatorTest {
+
+  private static final String MERGE_SORT_OPERATOR_TEST_SG = "root.MergeSortOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private static final String DEVICE0 = MERGE_SORT_OPERATOR_TEST_SG + ".device0";
+  private static final String DEVICE1 = MERGE_SORT_OPERATOR_TEST_SG + ".device1";
+  private static final String DEVICE2 = MERGE_SORT_OPERATOR_TEST_SG + ".device2";
+  private static final String DEVICE3 = MERGE_SORT_OPERATOR_TEST_SG + ".device3";
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, MERGE_SORT_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  long getValue(long expectedTime) {
+    if (expectedTime < 200) {
+      return 20000 + expectedTime;
+    } else if (expectedTime < 260
+        || (expectedTime >= 300 && expectedTime < 380)
+        || expectedTime >= 400) {
+      return 10000 + expectedTime;
+    } else {
+      return expectedTime;
+    }
+  }
+
+  // ------------------------------------------------------------------------------------------------
+  //                                   order by time - 1
+  // ------------------------------------------------------------------------------------------------
+  //                                    MergeSortOperator
+  //                              ____________|_______________
+  //                              /           |               \
+  //           SingleDeviceViewOperator SingleDeviceViewOperator SingleDeviceViewOperator
+  //                     /                     |                              \
+  //        SeriesScanOperator      TimeJoinOperator                TimeJoinOperator
+  //                                  /                \              /               \
+  //                  SeriesScanOperator SeriesScanOperator SeriesScanOperator   SeriesScanOperator
+  // ------------------------------------------------------------------------------------------------
+  public MergeSortOperator mergeSortOperatorTest(Ordering timeOrdering, Ordering deviceOrdering) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          6, new PlanNodeId("6"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          7, new PlanNodeId("7"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          8, new PlanNodeId("8"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          10, new PlanNodeId("10"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          11, new PlanNodeId("11"), MergeSortOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator3
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator4
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator5
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      List<TSDataType> tsDataTypes =
+          new LinkedList<>(
+              Arrays.asList(
+                  TSDataType.TEXT,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32,
+                  TSDataType.INT32));
+      SingleDeviceViewOperator singleDeviceViewOperator1 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              DEVICE0,
+              seriesScanOperator1,
+              Collections.singletonList(1),
+              tsDataTypes);
+
+      TimeJoinOperator timeJoinOperator1 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+      SingleDeviceViewOperator singleDeviceViewOperator2 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              DEVICE1,
+              timeJoinOperator1,
+              Arrays.asList(2, 3),
+              tsDataTypes);
+
+      TimeJoinOperator timeJoinOperator2 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+      SingleDeviceViewOperator singleDeviceViewOperator3 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(9),
+              DEVICE2,
+              timeJoinOperator2,
+              Arrays.asList(4, 5),
+              tsDataTypes);
+
+      return new MergeSortOperator(
+          fragmentInstanceContext.getOperatorContexts().get(10),
+          Arrays.asList(
+              singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
+          tsDataTypes,
+          MergeSortComparator.getComparator(
+              Arrays.asList(
+                  new SortItem(SortKey.TIME, timeOrdering),
+                  new SortItem(SortKey.DEVICE, deviceOrdering))));
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+      return null;
+    }
+  }
+
+  @Test
+  public void testOrderByTime1() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.ASC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(6, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          // make sure the device column is by asc
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 1500);
+  }
+
+  @Test
+  public void testOrderByTime2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.DESC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(6, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          // make sure the device column is by desc
+          assertEquals(checkDevice, 2);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 1500);
+  }
+
+  @Test
+  public void testOrderByTime3() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.DESC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(6, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          // make sure the device column is by desc
+          assertEquals(checkDevice, 2);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 1500);
+  }
+
+  @Test
+  public void testOrderByTime4() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.ASC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(6, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          // make sure the device column is by asc
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(4).isNull(i));
+          assertTrue(tsBlock.getColumn(5).isNull(i));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertTrue(tsBlock.getColumn(1).isNull(i));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          assertTrue(tsBlock.getColumn(3).isNull(i));
+          assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 1500);
+  }
+
+  // ------------------------------------------------------------------------------------------------
+  //                                        order by time - 2
+  // ------------------------------------------------------------------------------------------------
+  // [SSO]:SeriesScanOperator
+  // [SDO]:SingleDeviceOperator
+  //                                       MergeSortOperator
+  //                                    ___________|___________
+  //                                  /                         \
+  //                   MergeSortOperator                        MergeSortOperator
+  //                   /               |                       /               \
+  //               [SDO]             [SDO]                   [SDO]            [SDO]
+  //                 |                 |                       |                |
+  //               [SSO]       TimeJoinOperator        TimeJoinOperator   TimeJoinOperator
+  //                              /         \              /         \       /         \
+  //                            [SSO]      [SSO]         [SSO]      [SSO] [SSO]        [SSO]
+  // ------------------------------------------------------------------------------------------------
+  public MergeSortOperator mergeSortOperatorTest2(Ordering timeOrdering, Ordering deviceOrdering) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId7 = new PlanNodeId("7");
+      fragmentInstanceContext.addOperatorContext(
+          7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+
+      fragmentInstanceContext.addOperatorContext(
+          8, new PlanNodeId("8"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          10, new PlanNodeId("10"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          11, new PlanNodeId("11"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          12, new PlanNodeId("12"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          13, new PlanNodeId("13"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          14, new PlanNodeId("14"), SingleDeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          15, new PlanNodeId("15"), MergeSortOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          16, new PlanNodeId("16"), MergeSortOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          17, new PlanNodeId("17"), MergeSortOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      MeasurementPath measurementPath6 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath7 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor1", TSDataType.INT32);
+
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator3
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator4
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator5
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator6 =
+          new SeriesScanOperator(
+              planNodeId6,
+              measurementPath6,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator6
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator7 =
+          new SeriesScanOperator(
+              planNodeId7,
+              measurementPath7,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator7
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      List<TSDataType> tsDataTypes =
+          new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32));
+
+      TimeJoinOperator timeJoinOperator1 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+      TimeJoinOperator timeJoinOperator2 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+      TimeJoinOperator timeJoinOperator3 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(9),
+              Arrays.asList(seriesScanOperator6, seriesScanOperator7),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+      SingleDeviceViewOperator singleDeviceViewOperator1 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(10),
+              DEVICE0,
+              seriesScanOperator1,
+              Collections.singletonList(1),
+              tsDataTypes);
+      SingleDeviceViewOperator singleDeviceViewOperator2 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(11),
+              DEVICE1,
+              timeJoinOperator1,
+              Arrays.asList(1, 2),
+              tsDataTypes);
+      SingleDeviceViewOperator singleDeviceViewOperator3 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(12),
+              DEVICE2,
+              timeJoinOperator2,
+              Arrays.asList(1, 2),
+              tsDataTypes);
+      SingleDeviceViewOperator singleDeviceViewOperator4 =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(13),
+              DEVICE3,
+              timeJoinOperator3,
+              Arrays.asList(1, 2),
+              tsDataTypes);
+
+      MergeSortOperator mergeSortOperator1 =
+          new MergeSortOperator(
+              fragmentInstanceContext.getOperatorContexts().get(14),
+              Arrays.asList(singleDeviceViewOperator1, singleDeviceViewOperator2),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(SortKey.TIME, timeOrdering),
+                      new SortItem(SortKey.DEVICE, deviceOrdering))));
+      MergeSortOperator mergeSortOperator2 =
+          new MergeSortOperator(
+              fragmentInstanceContext.getOperatorContexts().get(15),
+              Arrays.asList(singleDeviceViewOperator3, singleDeviceViewOperator4),
+              tsDataTypes,
+              MergeSortComparator.getComparator(
+                  Arrays.asList(
+                      new SortItem(SortKey.TIME, timeOrdering),
+                      new SortItem(SortKey.DEVICE, deviceOrdering))));
+
+      return new MergeSortOperator(
+          fragmentInstanceContext.getOperatorContexts().get(16),
+          Arrays.asList(mergeSortOperator1, mergeSortOperator2),
+          tsDataTypes,
+          MergeSortComparator.getComparator(
+              Arrays.asList(
+                  new SortItem(SortKey.TIME, timeOrdering),
+                  new SortItem(SortKey.DEVICE, deviceOrdering))));
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+      return null;
+    }
+  }
+
+  @Test
+  public void testOrderByTime1_2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.ASC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by asc
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 3);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByTime2_2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.DESC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by desc
+          assertEquals(checkDevice, 3);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByTime3_2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.DESC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by desc
+          assertEquals(checkDevice, 3);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByTime4_2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.ASC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by asc
+          assertEquals(checkDevice, 0);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 1);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 2);
+          checkDevice++;
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertEquals(checkDevice, 3);
+          checkDevice = 0;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+  // ------------------------------------------------------------------------------------------------
+  //                                   order by device
+  // ------------------------------------------------------------------------------------------------
+  // [SSO]:SeriesScanOperator
+  //                                     MergeSortOperator
+  //                                  ___________|___________
+  //                                 /                      \
+  //                    DeviceViewOperator              DeviceViewOperator
+  //                    /               |                /               \
+  //                [SSO]       TimeJoinOperator TimeJoinOperator    TimeJoinOperator
+  //                               /         \      /         \         /         \
+  //                             [SSO]      [SSO] [SSO]      [SSO]  [SSO]       [SSO]
+  // ------------------------------------------------------------------------------------------------
+  public MergeSortOperator mergeSortOperatorTest3(Ordering timeOrdering, Ordering deviceOrdering) {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // Construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId7 = new PlanNodeId("7");
+      fragmentInstanceContext.addOperatorContext(
+          7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+
+      fragmentInstanceContext.addOperatorContext(
+          8, new PlanNodeId("8"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          10, new PlanNodeId("10"), TimeJoinOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          11, new PlanNodeId("11"), DeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          12, new PlanNodeId("12"), DeviceViewOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          13, new PlanNodeId("13"), MergeSortOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath3 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+      MeasurementPath measurementPath4 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath5 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+      MeasurementPath measurementPath6 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath7 =
+          new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor1", TSDataType.INT32);
+
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator3 =
+          new SeriesScanOperator(
+              planNodeId3,
+              measurementPath3,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator3
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator4 =
+          new SeriesScanOperator(
+              planNodeId4,
+              measurementPath4,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator4
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator5 =
+          new SeriesScanOperator(
+              planNodeId5,
+              measurementPath5,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator5
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator6 =
+          new SeriesScanOperator(
+              planNodeId6,
+              measurementPath6,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator6
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      SeriesScanOperator seriesScanOperator7 =
+          new SeriesScanOperator(
+              planNodeId7,
+              measurementPath7,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(6),
+              null,
+              null,
+              timeOrdering == Ordering.ASC);
+      seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator7
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      List<TSDataType> tsDataTypes =
+          new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32));
+
+      TimeJoinOperator timeJoinOperator1 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(7),
+              Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+      TimeJoinOperator timeJoinOperator2 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(8),
+              Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+      TimeJoinOperator timeJoinOperator3 =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(9),
+              Arrays.asList(seriesScanOperator6, seriesScanOperator7),
+              timeOrdering,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(
+                      new InputLocation(0, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()),
+                  new SingleColumnMerger(
+                      new InputLocation(1, 0),
+                      timeOrdering == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator())),
+              timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+      List<String> devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1, DEVICE2, DEVICE3));
+      if (deviceOrdering == Ordering.DESC) Collections.reverse(devices);
+      List<List<Integer>> deviceColumnIndex = new ArrayList<>();
+      deviceColumnIndex.add(Collections.singletonList(1));
+      deviceColumnIndex.add(Arrays.asList(1, 2));
+      if (deviceOrdering == Ordering.DESC) Collections.reverse(deviceColumnIndex);
+      DeviceViewOperator deviceViewOperator1 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(10),
+              deviceOrdering == Ordering.ASC
+                  ? Arrays.asList(DEVICE0, DEVICE1)
+                  : Arrays.asList(DEVICE1, DEVICE0),
+              deviceOrdering == Ordering.ASC
+                  ? Arrays.asList(seriesScanOperator1, timeJoinOperator1)
+                  : Arrays.asList(timeJoinOperator1, seriesScanOperator1),
+              deviceColumnIndex,
+              tsDataTypes);
+      deviceColumnIndex = new ArrayList<>();
+      deviceColumnIndex.add(Arrays.asList(1, 2));
+      deviceColumnIndex.add(Arrays.asList(1, 2));
+      DeviceViewOperator deviceViewOperator2 =
+          new DeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(11),
+              deviceOrdering == Ordering.ASC
+                  ? Arrays.asList(DEVICE2, DEVICE3)
+                  : Arrays.asList(DEVICE3, DEVICE2),
+              deviceOrdering == Ordering.ASC
+                  ? Arrays.asList(timeJoinOperator2, timeJoinOperator3)
+                  : Arrays.asList(timeJoinOperator3, timeJoinOperator2),
+              deviceColumnIndex,
+              tsDataTypes);
+      return new MergeSortOperator(
+          fragmentInstanceContext.getOperatorContexts().get(12),
+          Arrays.asList(deviceViewOperator1, deviceViewOperator2),
+          tsDataTypes,
+          MergeSortComparator.getComparator(
+              Arrays.asList(
+                  new SortItem(SortKey.DEVICE, deviceOrdering),
+                  new SortItem(SortKey.TIME, timeOrdering))));
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+      return null;
+    }
+  }
+
+  @Test
+  public void testOrderByDevice1() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.ASC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by asc
+          assertTrue(checkDevice < 500);
+          checkDevice++;
+          if (checkDevice == 500) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 500 && checkDevice < 1000);
+          checkDevice++;
+          if (checkDevice == 1000) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+          checkDevice++;
+          if (checkDevice == 1500) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+          checkDevice++;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByDevice2() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.DESC);
+    long lastTime = -1;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice < 500);
+          checkDevice++;
+          if (checkDevice == 500) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 500 && checkDevice < 1000);
+          checkDevice++;
+          if (checkDevice == 1000) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+          checkDevice++;
+          if (checkDevice == 1500) {
+            lastTime = -1;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by desc
+          assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+          checkDevice++;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByDevice3() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.ASC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by asc
+          assertTrue(checkDevice < 500);
+          checkDevice++;
+          if (checkDevice == 500) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 500 && checkDevice < 1000);
+          checkDevice++;
+          if (checkDevice == 1000) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+          checkDevice++;
+          if (checkDevice == 1500) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+          checkDevice++;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+
+  @Test
+  public void testOrderByDevice4() {
+    MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.DESC);
+    long lastTime = Long.MAX_VALUE;
+    int checkDevice = 0;
+    int count = 0;
+    while (mergeSortOperator.hasNext()) {
+      TsBlock tsBlock = mergeSortOperator.next();
+      if (tsBlock == null) continue;
+      assertEquals(3, tsBlock.getValueColumnCount());
+      count += tsBlock.getPositionCount();
+      for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+        assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+        lastTime = tsBlock.getTimeByIndex(i);
+        if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice < 500);
+          checkDevice++;
+          if (checkDevice == 500) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 500 && checkDevice < 1000);
+          checkDevice++;
+          if (checkDevice == 1000) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+          assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+          checkDevice++;
+          if (checkDevice == 1500) {
+            lastTime = Long.MAX_VALUE;
+          }
+        } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+          assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+          assertTrue(tsBlock.getColumn(2).isNull(i));
+          // make sure the device column is by desc
+          assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+          checkDevice++;
+        } else {
+          fail();
+        }
+      }
+    }
+    assertEquals(count, 2000);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
new file mode 100644
index 0000000000..b77aed1e0b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SingleDeviceViewOperatorTest {
+  private static final String SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG =
+      "root.SingleDeviceViewOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas,
+        deviceIds,
+        seqResources,
+        unSeqResources,
+        SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  /**
+   * Construct seriesScanOperator:[device0.sensor0, device1.sensor1], the result tsBlock should be
+   * like [Device, sensor0, sensor1]. The sensor1 column of device0 and the sensor0 column of
+   * device1 should be null.
+   */
+  @Test
+  public void singleDeviceViewOperatorTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      // construct operator tree
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          3, new PlanNodeId("3"), TimeJoinOperatorTest.class.getSimpleName());
+      fragmentInstanceContext.addOperatorContext(
+          4, new PlanNodeId("4"), SingleDeviceViewOperator.class.getSimpleName());
+
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(
+              SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(
+              SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              planNodeId1,
+              measurementPath1,
+              Collections.singleton("sensor0"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              null,
+              null,
+              true);
+      seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              planNodeId2,
+              measurementPath2,
+              Collections.singleton("sensor1"),
+              TSDataType.INT32,
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              null,
+              null,
+              true);
+      seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      TimeJoinOperator timeJoinOperator =
+          new TimeJoinOperator(
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+              Ordering.ASC,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              Arrays.asList(
+                  new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+                  new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())),
+              new AscTimeComparator());
+      SingleDeviceViewOperator singleDeviceViewOperator =
+          new SingleDeviceViewOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+              timeJoinOperator,
+              Arrays.asList(1, 2),
+              Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32, TSDataType.INT32));
+      int count = 0;
+      int total = 0;
+      while (singleDeviceViewOperator.hasNext()) {
+        TsBlock tsBlock = singleDeviceViewOperator.next();
+        assertEquals(4, tsBlock.getValueColumnCount());
+        total += tsBlock.getPositionCount();
+        for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+          long expectedTime = i + 20L * (count % 25);
+          assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+          assertEquals(
+              SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+              tsBlock.getColumn(0).getBinary(i).toString());
+          if (expectedTime < 200) {
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+            assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            assertTrue(tsBlock.getColumn(3).isNull(i));
+          } else if (expectedTime < 260
+              || (expectedTime >= 300 && expectedTime < 380)
+              || expectedTime >= 400) {
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+            assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+            assertTrue(tsBlock.getColumn(3).isNull(i));
+          } else {
+            assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+            assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+            assertTrue(tsBlock.getColumn(3).isNull(i));
+          }
+        }
+        count++;
+      }
+      assertEquals(500, total);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index f6ca5a3f6b..8b5dd9a345 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -255,7 +255,7 @@ public class QueryLogicalPlanUtil {
   static {
     String sql =
         "SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
-            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
     QueryId queryId = new QueryId("test");
     List<PlanNode> sourceNodeList1 = new ArrayList<>();
@@ -659,7 +659,7 @@ public class QueryLogicalPlanUtil {
   static {
     String sql =
         "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
-            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
     QueryId queryId = new QueryId("test");
     Filter timeFilter = TimeFilter.gt(100);
@@ -892,7 +892,7 @@ public class QueryLogicalPlanUtil {
   static {
     String sql =
         "SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
-            + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+            + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
 
     QueryId queryId = new QueryId("test");
     List<PlanNode> sourceNodeList1 = new ArrayList<>();
@@ -907,9 +907,7 @@ public class QueryLogicalPlanUtil {
             (MeasurementPath) schemaMap.get("root.sg.d1.s2"),
             Ordering.DESC));
     sourceNodeList1.forEach(
-        planNode -> {
-          ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
-        });
+        planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
 
     TimeJoinNode timeJoinNode1 =
         new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 991cebb099..81d9e0b29a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -35,9 +35,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
@@ -757,7 +757,7 @@ public class AggregationDistributionTest {
         plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
     PlanNode f3Root =
         plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
-    assertTrue(f1Root instanceof DeviceMergeNode);
+    assertTrue(f1Root instanceof MergeSortNode);
     assertTrue(f2Root instanceof VerticallyConcatNode);
     assertTrue(f3Root instanceof DeviceViewNode);
     assertTrue(f3Root.getChildren().get(0) instanceof VerticallyConcatNode);