You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/12/28 09:42:10 UTC

[iotdb] branch master updated: [IOTDB-1942] Support align by device query in new vector (#4435)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1d485  [IOTDB-1942] Support align by device query in new vector (#4435)
3c1d485 is described below

commit 3c1d4856cecba7ff75a65c53339b70eeab058228
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Tue Dec 28 17:41:36 2021 +0800

    [IOTDB-1942] Support align by device query in new vector (#4435)
---
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |    5 +-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |    6 +-
 .../IoTDBAggregationWithoutValueFilterIT.java      |   18 +-
 .../integration/aligned/IoTDBAlignByDevice2IT.java |   68 +
 .../integration/aligned/IoTDBAlignByDeviceIT.java  | 1349 ++++++++++++++++++++
 .../qp/logical/crud/GroupByFillQueryOperator.java  |    2 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |  151 ++-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |  139 +-
 .../iotdb/db/qp/physical/crud/MeasurementInfo.java |   37 +-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |   11 +-
 .../db/qp/physical/crud/RawDataQueryPlan.java      |    3 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |  121 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   11 +-
 .../session/IoTDBSessionVectorABDeviceIT.java      |   27 +-
 14 files changed, 1668 insertions(+), 280 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index dc9f52c..2b1509a 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -319,11 +319,8 @@ public class IoTDBAlignByDeviceIT {
           "103,root.vehicle.d0,199,null,",
           "104,root.vehicle.d0,190,null,",
           "105,root.vehicle.d0,199,11.11,",
-          "106,root.vehicle.d0,null,null,",
           "1000,root.vehicle.d0,55555,1000.11,",
           "946684800000,root.vehicle.d0,100,null,",
-          "1,root.vehicle.d1,null,null,",
-          "1000,root.vehicle.d1,null,null,",
         };
 
     try (Connection connection = EnvFactory.getEnv().getConnection();
@@ -357,7 +354,7 @@ public class IoTDBAlignByDeviceIT {
           Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
           cnt++;
         }
-        Assert.assertEquals(16, cnt);
+        Assert.assertEquals(13, cnt);
       }
     } catch (Exception e) {
       e.printStackTrace();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 8d35ee5..6750588 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -587,14 +587,14 @@ public class IoTDBSimpleQueryIT {
 
         resultSet = statement.executeQuery("select * from root.** align by device");
         // has time and device columns
-        Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
         while (resultSet.next()) {
           fail();
         }
 
         resultSet = statement.executeQuery("select count(*) from root align by device");
         // has device column
-        Assert.assertEquals(1, resultSet.getMetaData().getColumnCount());
+        Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
         while (resultSet.next()) {
           fail();
         }
@@ -604,7 +604,7 @@ public class IoTDBSimpleQueryIT {
                 "select count(*) from root where time >= 1 and time <= 100 "
                     + "group by ([0, 100), 20ms, 20ms) align by device");
         // has time and device columns
-        Assert.assertEquals(2, resultSet.getMetaData().getColumnCount());
+        Assert.assertEquals(3, resultSet.getMetaData().getColumnCount());
         while (resultSet.next()) {
           fail();
         }
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
index 5f37945..5d3ff74 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAggregationWithoutValueFilterIT.java
@@ -23,14 +23,24 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.itbase.category.LocalStandaloneTest;
 import org.apache.iotdb.jdbc.Config;
 
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @Category({LocalStandaloneTest.class})
 public class IoTDBAggregationWithoutValueFilterIT {
@@ -236,7 +246,7 @@ public class IoTDBAggregationWithoutValueFilterIT {
 
       boolean hasResultSet =
           statement.execute(
-              "select count(s1),count (s2),count (s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1");
+              "select count(s1),count(s2),count(s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1");
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java
new file mode 100644
index 0000000..6338bb5
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDevice2IT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/** Let One chunk has more than one page. */
+@Category({LocalStandaloneTest.class})
+public class IoTDBAlignByDevice2IT extends IoTDBAlignByDeviceIT {
+
+  private static int numOfPointsPerPage;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    // TODO When the aligned time series support compaction, we need to set compaction to true
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    numOfPointsPerPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(3);
+    AlignedWriteUtil.insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(numOfPointsPerPage);
+    EnvironmentUtils.cleanEnv();
+  }
+}
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java
new file mode 100644
index 0000000..6f429e4
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/aligned/IoTDBAlignByDeviceIT.java
@@ -0,0 +1,1349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.aligned;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.iotdb.db.constant.TestConstant.avg;
+import static org.apache.iotdb.db.constant.TestConstant.count;
+import static org.apache.iotdb.db.constant.TestConstant.firstValue;
+import static org.apache.iotdb.db.constant.TestConstant.lastValue;
+import static org.apache.iotdb.db.constant.TestConstant.maxTime;
+import static org.apache.iotdb.db.constant.TestConstant.maxValue;
+import static org.apache.iotdb.db.constant.TestConstant.minTime;
+import static org.apache.iotdb.db.constant.TestConstant.minValue;
+import static org.apache.iotdb.db.constant.TestConstant.sum;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test is for aligned time series exactly, which is different from
+ * integration/IoTDBAlignByDeviceIT.
+ */
+@Category({LocalStandaloneTest.class})
+public class IoTDBAlignByDeviceIT {
+
+  private static final double DELTA = 1e-6;
+  private static final String TIMESTAMP_STR = "Time";
+  protected static boolean enableSeqSpaceCompaction;
+  protected static boolean enableUnseqSpaceCompaction;
+  protected static boolean enableCrossSpaceCompaction;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.envSetUp();
+    // TODO When the aligned time series support compaction, we need to set compaction to true
+    enableSeqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableSeqSpaceCompaction();
+    enableUnseqSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableUnseqSpaceCompaction();
+    enableCrossSpaceCompaction =
+        IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqSpaceCompaction(false);
+    IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
+    AlignedWriteUtil.insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IoTDBDescriptor.getInstance().getConfig().setEnableSeqSpaceCompaction(enableSeqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableUnseqSpaceCompaction(enableUnseqSpaceCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setEnableCrossSpaceCompaction(enableCrossSpaceCompaction);
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void selectAllAlignedWithoutValueFilterTest() throws ClassNotFoundException {
+
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,1.0,1,1,true,aligned_test1",
+          "2,root.sg1.d1,2.0,2,2,null,aligned_test2",
+          "3,root.sg1.d1,30000.0,null,30000,true,aligned_unseq_test3",
+          "4,root.sg1.d1,4.0,4,null,true,aligned_test4",
+          "5,root.sg1.d1,5.0,5,null,true,aligned_test5",
+          "6,root.sg1.d1,6.0,6,6,true,null",
+          "7,root.sg1.d1,7.0,7,7,false,aligned_test7",
+          "8,root.sg1.d1,8.0,8,8,null,aligned_test8",
+          "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+          "10,root.sg1.d1,null,10,10,true,aligned_test10",
+          "11,root.sg1.d1,11.0,11,11,null,null",
+          "12,root.sg1.d1,12.0,12,12,null,null",
+          "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+          "14,root.sg1.d1,14.0,14,14,null,null",
+          "15,root.sg1.d1,15.0,15,15,null,null",
+          "16,root.sg1.d1,16.0,16,16,null,null",
+          "17,root.sg1.d1,17.0,17,17,null,null",
+          "18,root.sg1.d1,18.0,18,18,null,null",
+          "19,root.sg1.d1,19.0,19,19,null,null",
+          "20,root.sg1.d1,20.0,20,20,null,null",
+          "21,root.sg1.d1,null,null,21,true,null",
+          "22,root.sg1.d1,null,null,22,true,null",
+          "23,root.sg1.d1,230000.0,null,230000,false,null",
+          "24,root.sg1.d1,null,null,24,true,null",
+          "25,root.sg1.d1,null,null,25,true,null",
+          "26,root.sg1.d1,null,null,26,false,null",
+          "27,root.sg1.d1,null,null,27,false,null",
+          "28,root.sg1.d1,null,null,28,false,null",
+          "29,root.sg1.d1,null,null,29,false,null",
+          "30,root.sg1.d1,null,null,30,false,null",
+          "31,root.sg1.d1,null,31,null,null,aligned_test31",
+          "32,root.sg1.d1,null,32,null,null,aligned_test32",
+          "33,root.sg1.d1,null,33,null,null,aligned_test33",
+          "34,root.sg1.d1,null,34,null,null,aligned_test34",
+          "35,root.sg1.d1,null,35,null,null,aligned_test35",
+          "36,root.sg1.d1,null,36,null,null,aligned_test36",
+          "37,root.sg1.d1,null,37,null,null,aligned_test37",
+          "38,root.sg1.d1,null,38,null,null,aligned_test38",
+          "39,root.sg1.d1,null,39,null,null,aligned_test39",
+          "40,root.sg1.d1,null,40,null,null,aligned_test40",
+        };
+
+    String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select * from root.sg1.d1 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectAllAlignedAndNonAlignedTest() throws ClassNotFoundException {
+
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,1.0,1,1,true,aligned_test1",
+          "2,root.sg1.d1,2.0,2,2,null,aligned_test2",
+          "3,root.sg1.d1,30000.0,null,30000,true,aligned_unseq_test3",
+          "4,root.sg1.d1,4.0,4,null,true,aligned_test4",
+          "5,root.sg1.d1,5.0,5,null,true,aligned_test5",
+          "6,root.sg1.d1,6.0,6,6,true,null",
+          "7,root.sg1.d1,7.0,7,7,false,aligned_test7",
+          "8,root.sg1.d1,8.0,8,8,null,aligned_test8",
+          "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+          "10,root.sg1.d1,null,10,10,true,aligned_test10",
+          "11,root.sg1.d1,11.0,11,11,null,null",
+          "12,root.sg1.d1,12.0,12,12,null,null",
+          "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+          "14,root.sg1.d1,14.0,14,14,null,null",
+          "15,root.sg1.d1,15.0,15,15,null,null",
+          "16,root.sg1.d1,16.0,16,16,null,null",
+          "17,root.sg1.d1,17.0,17,17,null,null",
+          "18,root.sg1.d1,18.0,18,18,null,null",
+          "19,root.sg1.d1,19.0,19,19,null,null",
+          "20,root.sg1.d1,20.0,20,20,null,null",
+          "21,root.sg1.d1,null,null,21,true,null",
+          "22,root.sg1.d1,null,null,22,true,null",
+          "23,root.sg1.d1,230000.0,null,230000,false,null",
+          "24,root.sg1.d1,null,null,24,true,null",
+          "25,root.sg1.d1,null,null,25,true,null",
+          "26,root.sg1.d1,null,null,26,false,null",
+          "27,root.sg1.d1,null,null,27,false,null",
+          "28,root.sg1.d1,null,null,28,false,null",
+          "29,root.sg1.d1,null,null,29,false,null",
+          "30,root.sg1.d1,null,null,30,false,null",
+          "31,root.sg1.d1,null,31,null,null,aligned_test31",
+          "32,root.sg1.d1,null,32,null,null,aligned_test32",
+          "33,root.sg1.d1,null,33,null,null,aligned_test33",
+          "34,root.sg1.d1,null,34,null,null,aligned_test34",
+          "35,root.sg1.d1,null,35,null,null,aligned_test35",
+          "36,root.sg1.d1,null,36,null,null,aligned_test36",
+          "37,root.sg1.d1,null,37,null,null,aligned_test37",
+          "38,root.sg1.d1,null,38,null,null,aligned_test38",
+          "39,root.sg1.d1,null,39,null,null,aligned_test39",
+          "40,root.sg1.d1,null,40,null,null,aligned_test40",
+          "1,root.sg1.d2,1.0,1,1,true,non_aligned_test1",
+          "2,root.sg1.d2,2.0,2,2,null,non_aligned_test2",
+          "3,root.sg1.d2,3.0,null,3,false,non_aligned_test3",
+          "4,root.sg1.d2,4.0,4,null,true,non_aligned_test4",
+          "5,root.sg1.d2,5.0,5,null,true,non_aligned_test5",
+          "6,root.sg1.d2,6.0,6,6,true,null",
+          "7,root.sg1.d2,7.0,7,7,false,non_aligned_test7",
+          "8,root.sg1.d2,8.0,8,8,null,non_aligned_test8",
+          "9,root.sg1.d2,9.0,9,9,false,non_aligned_test9",
+          "10,root.sg1.d2,null,10,10,true,non_aligned_test10",
+          "11,root.sg1.d2,11.0,11,11,null,null",
+          "12,root.sg1.d2,12.0,12,12,null,null",
+          "13,root.sg1.d2,13.0,13,13,null,null",
+          "14,root.sg1.d2,14.0,14,14,null,null",
+          "15,root.sg1.d2,15.0,15,15,null,null",
+          "16,root.sg1.d2,16.0,16,16,null,null",
+          "17,root.sg1.d2,17.0,17,17,null,null",
+          "18,root.sg1.d2,18.0,18,18,null,null",
+          "19,root.sg1.d2,19.0,19,19,null,null",
+          "20,root.sg1.d2,20.0,20,20,null,null",
+          "21,root.sg1.d2,null,null,21,true,null",
+          "22,root.sg1.d2,null,null,22,true,null",
+          "23,root.sg1.d2,null,null,23,true,null",
+          "24,root.sg1.d2,null,null,24,true,null",
+          "25,root.sg1.d2,null,null,25,true,null",
+          "26,root.sg1.d2,null,null,26,false,null",
+          "27,root.sg1.d2,null,null,27,false,null",
+          "28,root.sg1.d2,null,null,28,false,null",
+          "29,root.sg1.d2,null,null,29,false,null",
+          "30,root.sg1.d2,null,null,30,false,null",
+          "31,root.sg1.d2,null,31,null,null,non_aligned_test31",
+          "32,root.sg1.d2,null,32,null,null,non_aligned_test32",
+          "33,root.sg1.d2,null,33,null,null,non_aligned_test33",
+          "34,root.sg1.d2,null,34,null,null,non_aligned_test34",
+          "35,root.sg1.d2,null,35,null,null,non_aligned_test35",
+          "36,root.sg1.d2,null,36,null,null,non_aligned_test36",
+          "37,root.sg1.d2,null,37,null,null,non_aligned_test37",
+          "38,root.sg1.d2,null,38,null,null,non_aligned_test38",
+          "39,root.sg1.d2,null,39,null,null,non_aligned_test39",
+          "40,root.sg1.d2,null,40,null,null,non_aligned_test40",
+        };
+
+    String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select * from root.sg1.* align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectAllAlignedWithTimeFilterTest() throws ClassNotFoundException {
+
+    String[] retArray =
+        new String[] {
+          "9,root.sg1.d1,9.0,9,9,false,aligned_test9",
+          "10,root.sg1.d1,null,10,10,true,aligned_test10",
+          "11,root.sg1.d1,11.0,11,11,null,null",
+          "12,root.sg1.d1,12.0,12,12,null,null",
+          "13,root.sg1.d1,130000.0,130000,130000,true,aligned_unseq_test13",
+          "14,root.sg1.d1,14.0,14,14,null,null",
+          "15,root.sg1.d1,15.0,15,15,null,null",
+          "16,root.sg1.d1,16.0,16,16,null,null",
+          "17,root.sg1.d1,17.0,17,17,null,null",
+          "18,root.sg1.d1,18.0,18,18,null,null",
+          "19,root.sg1.d1,19.0,19,19,null,null",
+          "20,root.sg1.d1,20.0,20,20,null,null",
+          "21,root.sg1.d1,null,null,21,true,null",
+          "22,root.sg1.d1,null,null,22,true,null",
+          "23,root.sg1.d1,230000.0,null,230000,false,null",
+          "24,root.sg1.d1,null,null,24,true,null",
+          "25,root.sg1.d1,null,null,25,true,null",
+          "26,root.sg1.d1,null,null,26,false,null",
+          "27,root.sg1.d1,null,null,27,false,null",
+          "28,root.sg1.d1,null,null,28,false,null",
+          "29,root.sg1.d1,null,null,29,false,null",
+          "30,root.sg1.d1,null,null,30,false,null",
+          "31,root.sg1.d1,null,31,null,null,aligned_test31",
+          "32,root.sg1.d1,null,32,null,null,aligned_test32",
+          "33,root.sg1.d1,null,33,null,null,aligned_test33",
+        };
+
+    String[] columnNames = {"Device", "s1", "s2", "s3", "s4", "s5"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select * from root.sg1.d1 where time >= 9 and time <= 33 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectSomeAlignedWithoutValueFilterTest1() throws ClassNotFoundException {
+
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,1.0,true,aligned_test1",
+          "2,root.sg1.d1,2.0,null,aligned_test2",
+          "3,root.sg1.d1,30000.0,true,aligned_unseq_test3",
+          "4,root.sg1.d1,4.0,true,aligned_test4",
+          "5,root.sg1.d1,5.0,true,aligned_test5",
+          "6,root.sg1.d1,6.0,true,null",
+          "7,root.sg1.d1,7.0,false,aligned_test7",
+          "8,root.sg1.d1,8.0,null,aligned_test8",
+          "9,root.sg1.d1,9.0,false,aligned_test9",
+          "10,root.sg1.d1,null,true,aligned_test10",
+          "11,root.sg1.d1,11.0,null,null",
+          "12,root.sg1.d1,12.0,null,null",
+          "13,root.sg1.d1,130000.0,true,aligned_unseq_test13",
+          "14,root.sg1.d1,14.0,null,null",
+          "15,root.sg1.d1,15.0,null,null",
+          "16,root.sg1.d1,16.0,null,null",
+          "17,root.sg1.d1,17.0,null,null",
+          "18,root.sg1.d1,18.0,null,null",
+          "19,root.sg1.d1,19.0,null,null",
+          "20,root.sg1.d1,20.0,null,null",
+          "21,root.sg1.d1,null,true,null",
+          "22,root.sg1.d1,null,true,null",
+          "23,root.sg1.d1,230000.0,false,null",
+          "24,root.sg1.d1,null,true,null",
+          "25,root.sg1.d1,null,true,null",
+          "26,root.sg1.d1,null,false,null",
+          "27,root.sg1.d1,null,false,null",
+          "28,root.sg1.d1,null,false,null",
+          "29,root.sg1.d1,null,false,null",
+          "30,root.sg1.d1,null,false,null",
+          "31,root.sg1.d1,null,null,aligned_test31",
+          "32,root.sg1.d1,null,null,aligned_test32",
+          "33,root.sg1.d1,null,null,aligned_test33",
+          "34,root.sg1.d1,null,null,aligned_test34",
+          "35,root.sg1.d1,null,null,aligned_test35",
+          "36,root.sg1.d1,null,null,aligned_test36",
+          "37,root.sg1.d1,null,null,aligned_test37",
+          "38,root.sg1.d1,null,null,aligned_test38",
+          "39,root.sg1.d1,null,null,aligned_test39",
+          "40,root.sg1.d1,null,null,aligned_test40",
+        };
+
+    String[] columnNames = {"Device", "s1", "s4", "s5"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select s1,s4,s5 from root.sg1.d1 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectSomeAlignedWithoutValueFilterTest2() throws ClassNotFoundException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,1.0,true",
+          "2,root.sg1.d1,2.0,null",
+          "3,root.sg1.d1,30000.0,true",
+          "4,root.sg1.d1,4.0,true",
+          "5,root.sg1.d1,5.0,true",
+          "6,root.sg1.d1,6.0,true",
+          "7,root.sg1.d1,7.0,false",
+          "8,root.sg1.d1,8.0,null",
+          "9,root.sg1.d1,9.0,false",
+          "10,root.sg1.d1,null,true",
+          "11,root.sg1.d1,11.0,null",
+          "12,root.sg1.d1,12.0,null",
+          "13,root.sg1.d1,130000.0,true",
+          "14,root.sg1.d1,14.0,null",
+          "15,root.sg1.d1,15.0,null",
+          "16,root.sg1.d1,16.0,null",
+          "17,root.sg1.d1,17.0,null",
+          "18,root.sg1.d1,18.0,null",
+          "19,root.sg1.d1,19.0,null",
+          "20,root.sg1.d1,20.0,null",
+          "21,root.sg1.d1,null,true",
+          "22,root.sg1.d1,null,true",
+          "23,root.sg1.d1,230000.0,false",
+          "24,root.sg1.d1,null,true",
+          "25,root.sg1.d1,null,true",
+          "26,root.sg1.d1,null,false",
+          "27,root.sg1.d1,null,false",
+          "28,root.sg1.d1,null,false",
+          "29,root.sg1.d1,null,false",
+          "30,root.sg1.d1,null,false",
+        };
+
+    String[] columnNames = {"Device", "s1", "s4"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select s1,s4 from root.sg1.d1 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void selectSomeAlignedWithTimeFilterTest() throws ClassNotFoundException {
+    String[] retArray =
+        new String[] {
+          "16,root.sg1.d1,16.0,null,null",
+          "17,root.sg1.d1,17.0,null,null",
+          "18,root.sg1.d1,18.0,null,null",
+          "19,root.sg1.d1,19.0,null,null",
+          "20,root.sg1.d1,20.0,null,null",
+          "21,root.sg1.d1,null,true,null",
+          "22,root.sg1.d1,null,true,null",
+          "23,root.sg1.d1,230000.0,false,null",
+          "24,root.sg1.d1,null,true,null",
+          "25,root.sg1.d1,null,true,null",
+          "26,root.sg1.d1,null,false,null",
+          "27,root.sg1.d1,null,false,null",
+          "28,root.sg1.d1,null,false,null",
+          "29,root.sg1.d1,null,false,null",
+          "30,root.sg1.d1,null,false,null",
+          "31,root.sg1.d1,null,null,aligned_test31",
+          "32,root.sg1.d1,null,null,aligned_test32",
+          "33,root.sg1.d1,null,null,aligned_test33",
+          "34,root.sg1.d1,null,null,aligned_test34",
+        };
+
+    String[] columnNames = {"Device", "s1", "s4", "s5"};
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select s1,s4,s5 from root.sg1.d1 where time >= 16 and time <= 34 align by device");
+      Assert.assertTrue(hasResultSet);
+
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length + 1, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          builder.append(resultSet.getString(1));
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(",").append(resultSet.getString(index));
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1", "20", "29", "28", "19", "20"};
+    String[] columnNames = {
+      "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(*) from root.sg1.d1 align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedAndNonAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1,20,29,28,19,20,", "root.sg1.d2,19,29,28,18,19,"};
+    String[] columnNames = {
+      "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet = statement.execute("select count(*) from root.sg1.* align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          StringBuilder builder = new StringBuilder();
+          for (String columnName : columnNames) {
+            int index = map.get(columnName);
+            builder.append(resultSet.getString(index)).append(",");
+          }
+          assertEquals(retArray[cnt], builder.toString());
+          cnt++;
+        }
+        assertEquals(retArray.length, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedWithTimeFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1", "12", "15", "22", "13", "6"};
+    String[] columnNames = {
+      "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(*) from root.sg1.d1 where time >= 9 and time <= 33 align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /** aggregate multi columns of aligned timeseries in one SQL */
+  @Test
+  public void aggregateSomeAlignedWithoutTimeFilterTest() throws ClassNotFoundException {
+    double[] retArray =
+        new double[] {
+          20, 29, 28, 390184, 130549, 390417, 19509.2, 4501.689655172413, 13943.464285714286
+        };
+    String[] columnNames = {
+      "Device",
+      "count(s1)",
+      "count(s2)",
+      "count(s3)",
+      "sum(s1)",
+      "sum(s2)",
+      "sum(s3)",
+      "avg(s1)",
+      "avg(s2)",
+      "avg(s3)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1),count(s2),count(s3),sum(s1),sum(s2),sum(s3),avg(s1),avg(s2),avg(s3) from root.sg1.d1 align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>();
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          double[] ans = new double[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 1; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i - 1] = Double.parseDouble(resultSet.getString(index));
+            assertEquals(retArray[i - 1], ans[i - 1], DELTA);
+          }
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1", "11"};
+    String[] columnNames = {"Device", "count(s4)"};
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute("select count(s4) from root.sg1.d1 where s4 = true align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationFuncAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray =
+        new String[] {"root.sg1.d1", "8", "42.0", "5.25", "1.0", "9.0", "1", "9", "1.0", "9.0"};
+    String[] columnNames = {
+      "Device",
+      "count(s1)",
+      "sum(s1)",
+      "avg(s1)",
+      "first_value(s1)",
+      "last_value(s1)",
+      "min_time(s1)",
+      "max_time(s1)",
+      "min_value(s1)",
+      "max_value(s1)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s1), avg(s1), "
+                  + "first_value(s1), last_value(s1), "
+                  + "min_time(s1), max_time(s1),"
+                  + "max_value(s1), min_value(s1) from root.sg1.d1 where s1 < 10 "
+                  + "align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1", "6", "6", "9", "11", "6"};
+    String[] columnNames = {
+      "Device", "count(s1)", "count(s2)", "count(s3)", "count(s4)", "count(s5)"
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select count(*) from root.sg1.d1 where s4 = true " + "align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void aggregationAllAlignedWithValueFilterTest() throws ClassNotFoundException {
+    String[] retArray = new String[] {"root.sg1.d1", "160016.0", "11", "1", "13"};
+    String[] columnNames = {
+      "Device", "sum(s1)", "count(s4)", "min_value(s3)", "max_time(s2)",
+    };
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      boolean hasResultSet =
+          statement.execute(
+              "select sum(s1), count(s4), min_value(s3), max_time(s2) from root.sg1.d1 where s4 = true "
+                  + "align by device");
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        Map<String, Integer> map = new HashMap<>(); // used to adjust result sequence
+        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+          map.put(resultSetMetaData.getColumnName(i), i);
+        }
+        assertEquals(columnNames.length, resultSetMetaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          String[] ans = new String[columnNames.length];
+          // No need to add time column for aggregation query
+          for (int i = 0; i < columnNames.length; i++) {
+            String columnName = columnNames[i];
+            int index = map.get(columnName);
+            ans[i] = resultSet.getString(index);
+          }
+          assertArrayEquals(retArray, ans);
+          cnt++;
+        }
+        assertEquals(1, cnt);
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void countSumAvgGroupByTimeTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,4,40.0,7.5",
+          "11,root.sg1.d1,10,130142.0,13014.2",
+          "21,root.sg1.d1,1,null,230000.0",
+          "31,root.sg1.d1,0,355.0,null"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+                  + "where time > 5 GROUP BY ([1, 41), 10ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(count("s1"))
+                  + ","
+                  + resultSet.getString(sum("s2"))
+                  + ","
+                  + resultSet.getString(avg("s1"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void maxMinValueGroupByTimeTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,10,6.0,10,6",
+          "11,root.sg1.d1,130000,11.0,20,11",
+          "21,root.sg1.d1,230000,230000.0,null,21",
+          "31,root.sg1.d1,null,null,40,null"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+                  + "where time > 5 GROUP BY ([1, 41), 10ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(maxValue("s3"))
+                  + ","
+                  + resultSet.getString(minValue("s1"))
+                  + ","
+                  + resultSet.getString(maxTime("s2"))
+                  + ","
+                  + resultSet.getString(minTime("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void firstLastGroupByTimeTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,null,null", "6,root.sg1.d1,true,aligned_test7",
+              "11,root.sg1.d1,true,aligned_unseq_test13", "16,root.sg1.d1,null,null",
+          "21,root.sg1.d1,true,null", "26,root.sg1.d1,false,null",
+              "31,root.sg1.d1,null,aligned_test31", "36,root.sg1.d1,null,aligned_test36"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select last_value(s4), first_value(s5) from root.sg1.d1 "
+                  + "where time > 5 and time < 38 GROUP BY ([1, 41), 5ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(lastValue("s4"))
+                  + ","
+                  + resultSet.getString(firstValue("s5"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void groupByWithWildcardTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,9,9,8,8,9,9.0,10,10,true,aligned_test10",
+          "11,root.sg1.d1,10,10,10,1,1,20.0,20,20,true,aligned_unseq_test13",
+          "21,root.sg1.d1,1,0,10,10,0,230000.0,null,30,false,null",
+          "31,root.sg1.d1,0,10,0,0,10,null,40,null,null,aligned_test40"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select count(*), last_value(*) from root.sg1.d1 GROUP BY ([1, 41), 10ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(count("s1"))
+                  + ","
+                  + resultSet.getString(count("s2"))
+                  + ","
+                  + resultSet.getString(count("s3"))
+                  + ","
+                  + resultSet.getString(count("s4"))
+                  + ","
+                  + resultSet.getString(count("s5"))
+                  + ","
+                  + resultSet.getString(lastValue("s1"))
+                  + ","
+                  + resultSet.getString(lastValue("s2"))
+                  + ","
+                  + resultSet.getString(lastValue("s3"))
+                  + ","
+                  + resultSet.getString(lastValue("s4"))
+                  + ","
+                  + resultSet.getString(lastValue("s5"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void groupByWithNonAlignedTimeseriesTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,0,null,null",
+          "7,root.sg1.d1,3,34.0,8.0",
+          "13,root.sg1.d1,4,130045.0,32511.25",
+          "19,root.sg1.d1,2,39.0,19.5",
+          "25,root.sg1.d1,0,null,null",
+          "31,root.sg1.d1,0,130.0,null",
+          "37,root.sg1.d1,0,154.0,null",
+          "1,root.sg1.d2,0,null,null",
+          "7,root.sg1.d2,3,34.0,8.0",
+          "13,root.sg1.d2,4,58.0,14.5",
+          "19,root.sg1.d2,2,39.0,19.5",
+          "25,root.sg1.d2,0,null,null",
+          "31,root.sg1.d2,0,130.0,null",
+          "37,root.sg1.d2,0,154.0,null"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s2), avg(s1) from root.sg1.* "
+                  + "where time > 5 GROUP BY ([1, 41), 4ms, 6ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(count("s1"))
+                  + ","
+                  + resultSet.getString(sum("s2"))
+                  + ","
+                  + resultSet.getString(avg("s1"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void countSumAvgPreviousFillTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,4,40.0,7.5",
+          "11,root.sg1.d1,10,130142.0,13014.2",
+          "21,root.sg1.d1,1,130142.0,230000.0",
+          "31,root.sg1.d1,0,355.0,230000.0"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+                  + "where time > 5 GROUP BY ([1, 41), 10ms) FILL (previous, 15ms) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(count("s1"))
+                  + ","
+                  + resultSet.getString(sum("s2"))
+                  + ","
+                  + resultSet.getString(avg("s1"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void countSumAvgValueFillTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,1,3.14,30000.0",
+          "6,root.sg1.d1,4,40.0,7.5",
+          "11,root.sg1.d1,5,130052.0,26010.4",
+          "16,root.sg1.d1,5,90.0,18.0",
+          "21,root.sg1.d1,1,3.14,230000.0",
+          "26,root.sg1.d1,0,3.14,3.14",
+          "31,root.sg1.d1,0,3.14,3.14"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select count(s1), sum(s2), avg(s1) from root.sg1.d1 "
+                  + "where s3 > 5 and time < 30 GROUP BY ([1, 36), 5ms) FILL (3.14) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(count("s1"))
+                  + ","
+                  + resultSet.getString(sum("s2"))
+                  + ","
+                  + resultSet.getString(avg("s1"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void maxMinValueTimePreviousUntilLastFillTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,30000,6.0,9,3",
+          "11,root.sg1.d1,130000,11.0,20,11",
+          "21,root.sg1.d1,230000,230000.0,null,23",
+          "31,root.sg1.d1,null,null,null,null"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+                  + "where s1 > 5 and time < 35 GROUP BY ([1, 41), 10ms) FILL(previousUntilLast) align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(maxValue("s3"))
+                  + ","
+                  + resultSet.getString(minValue("s1"))
+                  + ","
+                  + resultSet.getString(maxTime("s2"))
+                  + ","
+                  + resultSet.getString(minTime("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+
+  @Test
+  public void maxMinValueTimeValueFillTest() throws SQLException {
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,30000,30000.0,null,3",
+          "6,root.sg1.d1,10,6.0,10,6",
+          "11,root.sg1.d1,130000,11.0,15,11",
+          "16,root.sg1.d1,20,16.0,20,16",
+          "21,root.sg1.d1,230000,230000.0,null,21",
+          "26,root.sg1.d1,29,null,null,26"
+        };
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "select max_value(s3), min_value(s1), max_time(s2), min_time(s3) from root.sg1.d1 "
+                  + "where s3 > 5 and time < 30 GROUP BY ([1, 31), 5ms) FILL ('fill string') align by device");
+      Assert.assertTrue(hasResultSet);
+
+      int cnt;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("Device")
+                  + ","
+                  + resultSet.getString(maxValue("s3"))
+                  + ","
+                  + resultSet.getString(minValue("s1"))
+                  + ","
+                  + resultSet.getString(maxTime("s2"))
+                  + ","
+                  + resultSet.getString(minTime("s3"));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray.length, cnt);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
index 485432d..f6f71c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
@@ -41,7 +41,7 @@ public class GroupByFillQueryOperator extends GroupByQueryOperator {
   protected AlignByDevicePlan generateAlignByDevicePlan(PhysicalGenerator generator)
       throws QueryProcessException {
     AlignByDevicePlan alignByDevicePlan = super.generateAlignByDevicePlan(generator);
-    alignByDevicePlan.setGroupByTimePlan(initGroupByTimeFillPlan(new GroupByTimeFillPlan()));
+    alignByDevicePlan.setGroupByFillPlan(initGroupByTimeFillPlan(new GroupByTimeFillPlan()));
 
     return alignByDevicePlan;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 134d57d..4653ed6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -23,14 +23,12 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.index.common.IndexType;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -41,7 +39,6 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
 import org.apache.iotdb.db.service.IoTDB;
-import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
@@ -54,7 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.iotdb.db.utils.SchemaUtils.getAggregationType;
+import static org.apache.iotdb.db.utils.SchemaUtils.getSeriesTypeByPath;
 
 public class QueryOperator extends Operator {
 
@@ -173,8 +170,17 @@ public class QueryOperator extends Operator {
   }
 
   public void check() throws LogicalOperatorException {
-    if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
-      throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
+    if (isAlignByDevice()) {
+      if (selectComponent.hasTimeSeriesGeneratingFunction()) {
+        throw new LogicalOperatorException(
+            "ALIGN BY DEVICE clause is not supported in UDF queries.");
+      }
+
+      for (PartialPath path : selectComponent.getPaths()) {
+        if (path.getNodes().length > 1) {
+          throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+        }
+      }
     }
   }
 
@@ -235,105 +241,99 @@ public class QueryOperator extends Operator {
       throws QueryProcessException {
     AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
 
-    List<PartialPath> prefixPaths = fromComponent.getPrefixPaths();
     // remove stars in fromPaths and get deviceId with deduplication
-    List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
-    List<ResultColumn> resultColumns = selectComponent.getResultColumns();
+    List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
+    List<ResultColumn> resultColumns =
+        convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
     List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
     // to record result measurement columns
     List<String> measurements = new ArrayList<>();
     Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
     List<PartialPath> paths = new ArrayList<>();
+    List<String> aggregations = new ArrayList<>();
 
-    for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
+    // per suffix in SELECT
+    for (int i = 0; i < resultColumns.size(); i++) {
       ResultColumn resultColumn = resultColumns.get(i);
-      Expression suffixExpression = resultColumn.getExpression();
-      PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+      PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression());
       String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
-
       // to record measurements in the loop of a suffix path
       Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+      // concat suffix with per device
       for (PartialPath device : devices) {
         PartialPath fullPath = device.concatPath(suffixPath);
         try {
           // remove stars in SELECT to get actual paths
           List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath);
-          if (suffixPath.getNodes().length > 1) {
-            throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
-          }
           if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
             throw new QueryProcessException(
                 String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
           }
-          if (actualPaths.isEmpty()) {
-            String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
-            if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
-              measurementInfoMap.putIfAbsent(
-                  nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
+          for (MeasurementPath path : actualPaths) {
+            MeasurementInfo measurementInfo =
+                new MeasurementInfo(getMeasurementName(path, aggregation));
+            TSDataType columnDataType = getSeriesTypeByPath(path, aggregation);
+            if (aggregation != null) {
+              aggregations.add(aggregation);
             }
-          } else {
-            for (PartialPath path : actualPaths) {
-              String measurementName = getMeasurementName(path, aggregation);
-              TSDataType measurementDataType = path.getSeriesType();
-              TSDataType columnDataType = getAggregationType(aggregation);
-              columnDataType = columnDataType == null ? measurementDataType : columnDataType;
-              MeasurementInfo measurementInfo =
-                  measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
-
-              if (resultColumn.hasAlias()) {
-                measurementInfo.setMeasurementAlias(resultColumn.getAlias());
-              }
-
-              // check datatype consistency
-              // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
-              // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
-              if (measurementInfo.getColumnDataType() != null) {
-                if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
-                  throw new QueryProcessException(
-                      "The data types of the same measurement column should be the same across devices.");
-                }
-              } else {
-                measurementInfo.setColumnDataType(columnDataType);
-                measurementInfo.setMeasurementDataType(measurementDataType);
-              }
-
-              measurementSetOfGivenSuffix.add(measurementName);
-              measurementInfo.setMeasurementType(MeasurementType.Exist);
-              measurementInfoMap.put(measurementName, measurementInfo);
-              // update paths
-              paths.add(path);
+            checkDataTypeConsistency(
+                columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement()));
+
+            if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) {
+              measurementInfo.setMeasurementAlias(
+                  resultColumn.hasAlias() ? resultColumn.getAlias() : null);
+              measurementInfo.setColumnDataType(columnDataType);
+              measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo);
             }
+            measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement());
+            paths.add(path);
           }
         } catch (MetadataException | QueryProcessException e) {
           throw new QueryProcessException(e.getMessage());
         }
       }
 
-      // Note that in the loop of a suffix path, set is used.
-      // And across the loops of suffix paths, list is used.
-      // e.g. select *,s1 from root.sg.d0, root.sg.d1
-      // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
-      // for suffix s1, measurementSetOfGivenSuffix = {s1}
-      // therefore the final measurements is [s1,s2,s3,s1].
-      measurements.addAll(measurementSetOfGivenSuffix);
+      if (measurementSetOfGivenSuffix.isEmpty()) {
+        measurements.add(suffixPath.toString());
+      } else {
+        // Note that in the loop of a suffix path, set is used.
+        // And across the loops of suffix paths, list is used.
+        // e.g. select *,s1 from root.sg.d0, root.sg.d1
+        // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3}
+        // for suffix s1, measurementSetOfGivenSuffix = {s1}
+        // therefore the final measurements is [s1,s2,s3,s1].
+        measurements.addAll(measurementSetOfGivenSuffix);
+      }
     }
 
-    List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements);
     // assigns to alignByDevicePlan
-    alignByDevicePlan.setMeasurements(trimMeasurements);
-    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
-    alignByDevicePlan.setDevices(devices);
+    alignByDevicePlan.setMeasurements(measurements);
     alignByDevicePlan.setPaths(paths);
+    alignByDevicePlan.setAggregations(aggregations);
+    alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
     alignByDevicePlan.setEnableTracing(enableTracing);
 
+    alignByDevicePlan.deduplicate(generator);
+
     if (whereComponent != null) {
       alignByDevicePlan.setDeviceToFilterMap(
-          concatFilterByDevice(devices, whereComponent.getFilterOperator()));
+          concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator()));
     }
 
     return alignByDevicePlan;
   }
 
+  private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo)
+      throws QueryProcessException {
+    // check datatype consistency
+    // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+    // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+    if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) {
+      throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE);
+    }
+  }
+
   protected void convertSpecialClauseValues(QueryPlan queryPlan) {
     if (specialClauseComponent != null) {
       queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull());
@@ -345,16 +345,16 @@ public class QueryOperator extends Operator {
     }
   }
 
-  private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
-      throws QueryProcessException {
+  private List<ResultColumn> convertSpecialClauseValues(
+      QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
     convertSpecialClauseValues(queryPlan);
     // sLimit trim on the measurementColumnList
     if (specialClauseComponent.hasSlimit()) {
       int seriesSLimit = specialClauseComponent.getSeriesLimit();
       int seriesOffset = specialClauseComponent.getSeriesOffset();
-      return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
+      return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
     }
-    return measurements;
+    return resultColumns;
   }
 
   private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -385,19 +385,16 @@ public class QueryOperator extends Operator {
    */
   private String getMeasurementName(PartialPath path, String aggregation) {
     String initialMeasurement = path.getMeasurement();
-    if (path instanceof AlignedPath) {
-      String subMeasurement = ((AlignedPath) path).getMeasurement(0);
-      initialMeasurement += TsFileConstant.PATH_SEPARATOR + subMeasurement;
-    }
     if (aggregation != null) {
       initialMeasurement = aggregation + "(" + initialMeasurement + ")";
     }
     return initialMeasurement;
   }
 
-  private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
+  private List<ResultColumn> slimitTrimColumn(
+      List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
       throws QueryProcessException {
-    int size = columnList.size();
+    int size = resultColumns.size();
 
     // check parameter range
     if (seriesOffset >= size) {
@@ -411,14 +408,15 @@ public class QueryOperator extends Operator {
     }
 
     // trim seriesPath list
-    return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
+    return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
   }
 
   // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
   // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
   //  root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
   private Map<String, IExpression> concatFilterByDevice(
-      List<PartialPath> devices, FilterOperator operator) throws QueryProcessException {
+      AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator)
+      throws QueryProcessException {
     Map<String, IExpression> deviceToFilterMap = new HashMap<>();
     Set<PartialPath> filterPaths = new HashSet<>();
     Iterator<PartialPath> deviceIterator = devices.iterator();
@@ -430,6 +428,7 @@ public class QueryOperator extends Operator {
         concatFilterPath(device, newOperator, filterPaths);
       } catch (LogicalOptimizeException | MetadataException e) {
         deviceIterator.remove();
+        alignByDevicePlan.removeDevice(device.getFullPath());
         continue;
       }
       // transform to a list so it can be indexed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index 37b3a1f..7132cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -41,17 +42,21 @@ public class AlignByDevicePlan extends QueryPlan {
       "The paths of the SELECT clause can only be measurements or STAR.";
   public static final String ALIAS_ERROR_MESSAGE =
       "alias %s can only be matched with one time series";
+  public static final String DATATYPE_ERROR_MESSAGE =
+      "The data types of the same measurement column should be the same across devices.";
 
   // to record result measurement columns, e.g. temperature, status, speed
   private List<String> measurements;
-  private List<TSDataType> dataTypes;
   private Map<String, MeasurementInfo> measurementInfoMap;
+  private List<PartialPath> deduplicatePaths = new ArrayList<>();
+  private List<String> aggregations;
 
-  // to check data type consistency for the same name sensor of different devices
-  private List<PartialPath> devices;
+  // paths index of each device that need to execute
+  private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>();
   private Map<String, IExpression> deviceToFilterMap;
 
   private GroupByTimePlan groupByTimePlan;
+  private GroupByTimeFillPlan groupByFillPlan;
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
 
@@ -61,7 +66,41 @@ public class AlignByDevicePlan extends QueryPlan {
 
   @Override
   public void deduplicate(PhysicalGenerator physicalGenerator) {
-    // do nothing
+    Set<String> pathWithAggregationSet = new LinkedHashSet<>();
+    List<String> deduplicatedAggregations = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = paths.get(i);
+      String aggregation = aggregations != null ? aggregations.get(i) : null;
+      String pathStrWithAggregation = getPathStrWithAggregation(path, aggregation);
+      if (!pathWithAggregationSet.contains(pathStrWithAggregation)) {
+        pathWithAggregationSet.add(pathStrWithAggregation);
+        deduplicatePaths.add(path);
+        if (this.aggregations != null) {
+          deduplicatedAggregations.add(this.aggregations.get(i));
+        }
+        deviceToPathIndex
+            .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+            .add(deduplicatePaths.size() - 1);
+      }
+    }
+    setAggregations(deduplicatedAggregations);
+    this.paths = null;
+  }
+
+  public List<PartialPath> getDeduplicatePaths() {
+    return deduplicatePaths;
+  }
+
+  public void removeDevice(String device) {
+    deviceToPathIndex.remove(device);
+  }
+
+  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+    this.measurementInfoMap = measurementInfoMap;
+  }
+
+  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+    return measurementInfoMap;
   }
 
   @Override
@@ -69,51 +108,32 @@ public class AlignByDevicePlan extends QueryPlan {
     TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
 
     List<String> respColumns = new ArrayList<>();
-    List<String> columnsTypes = new ArrayList<>();
+    List<String> columnTypes = new ArrayList<>();
 
     // the DEVICE column of ALIGN_BY_DEVICE result
     respColumns.add(SQLConstant.ALIGNBY_DEVICE_COLUMN_NAME);
-    columnsTypes.add(TSDataType.TEXT.toString());
-
-    // get column types and do deduplication
-    List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
-    deduplicatedColumnsType.add(TSDataType.TEXT);
+    columnTypes.add(TSDataType.TEXT.toString());
 
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
-    Map<String, MeasurementInfo> measurementInfoMap = getMeasurementInfoMap();
-
     // build column header with constant and non exist column and deduplication
-    List<String> measurements = getMeasurements();
     for (String measurement : measurements) {
       MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
       TSDataType type = TSDataType.TEXT;
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          type = measurementInfo.getColumnDataType();
-          break;
-        case NonExist:
-        case Constant:
-          type = TSDataType.TEXT;
+      String measurementName = measurement;
+      if (measurementInfo != null) {
+        type = measurementInfo.getColumnDataType();
+        measurementName = measurementInfo.getMeasurementAlias();
       }
-      String measurementAlias = measurementInfo.getMeasurementAlias();
-      respColumns.add(measurementAlias != null ? measurementAlias : measurement);
-      columnsTypes.add(type.toString());
+      respColumns.add(measurementName != null ? measurementName : measurement);
+      columnTypes.add(type.toString());
 
-      if (!deduplicatedMeasurements.contains(measurement)) {
-        deduplicatedMeasurements.add(measurement);
-        deduplicatedColumnsType.add(type);
-      }
+      deduplicatedMeasurements.add(measurement);
     }
 
-    // save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
-    // i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
-    setMeasurements(new ArrayList<>(deduplicatedMeasurements));
-    setDataTypes(deduplicatedColumnsType);
-
-    // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
-    setPaths(null);
+    // save deduplicated measurements in AlignByDevicePlan for AlignByDeviceDataSet to use.
+    measurements = new ArrayList<>(deduplicatedMeasurements);
     resp.setColumns(respColumns);
-    resp.setDataTypeList(columnsTypes);
+    resp.setDataTypeList(columnTypes);
     if (getOperatorType() == OperatorType.AGGREGATION) {
       resp.setIgnoreTimeStamp(true);
     }
@@ -129,20 +149,20 @@ public class AlignByDevicePlan extends QueryPlan {
   }
 
   @Override
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
+  public List<String> getAggregations() {
+    return aggregations;
   }
 
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
+  public void setAggregations(List<String> aggregations) {
+    this.aggregations = aggregations.isEmpty() ? null : aggregations;
   }
 
-  public void setDevices(List<PartialPath> devices) {
-    this.devices = devices;
+  public Map<String, List<Integer>> getDeviceToPathIndex() {
+    return deviceToPathIndex;
   }
 
-  public List<PartialPath> getDevices() {
-    return devices;
+  public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) {
+    this.deviceToPathIndex = deviceToPathIndex;
   }
 
   public Map<String, IExpression> getDeviceToFilterMap() {
@@ -162,6 +182,15 @@ public class AlignByDevicePlan extends QueryPlan {
     this.setOperatorType(OperatorType.GROUP_BY_TIME);
   }
 
+  public GroupByTimeFillPlan getGroupByFillPlan() {
+    return groupByFillPlan;
+  }
+
+  public void setGroupByFillPlan(GroupByTimeFillPlan groupByFillPlan) {
+    this.groupByFillPlan = groupByFillPlan;
+    this.setOperatorType(OperatorType.GROUP_BY_FILL);
+  }
+
   public FillQueryPlan getFillQueryPlan() {
     return fillQueryPlan;
   }
@@ -180,23 +209,11 @@ public class AlignByDevicePlan extends QueryPlan {
     this.setOperatorType(Operator.OperatorType.AGGREGATION);
   }
 
-  public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
-    this.measurementInfoMap = measurementInfoMap;
-  }
-
-  public Map<String, MeasurementInfo> getMeasurementInfoMap() {
-    return measurementInfoMap;
-  }
-
-  /**
-   * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
-   * that do not exist in any device, data type is considered as String. The value is considered as
-   * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
-   * considered as String and the value is the measurement name.
-   */
-  public enum MeasurementType {
-    Exist,
-    NonExist,
-    Constant
+  private String getPathStrWithAggregation(PartialPath path, String aggregation) {
+    String initialPath = path.getFullPath();
+    if (aggregation != null) {
+      initialPath = aggregation + "(" + initialPath + ")";
+    }
+    return initialPath;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
index 7c6dd17..c91493c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -18,51 +18,38 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class MeasurementInfo {
 
   public MeasurementInfo() {}
 
-  public MeasurementInfo(MeasurementType measurementType) {
-    this.measurementType = measurementType;
+  public MeasurementInfo(String measurement) {
+    this.measurement = measurement;
   }
 
+  private String measurement;
+
   // select s1, s2 as speed from root, then s2 -> speed
   private String measurementAlias;
 
-  // to record different kinds of measurement
-  private MeasurementType measurementType;
-
-  // to record the real type of the measurement, used for actual query
-  private TSDataType measurementDataType;
-
   // to record the datatype of the column in the result set
   private TSDataType columnDataType;
 
-  public void setMeasurementAlias(String measurementAlias) {
-    this.measurementAlias = measurementAlias;
+  public void setMeasurement(String measurement) {
+    this.measurement = measurement;
   }
 
-  public String getMeasurementAlias() {
-    return measurementAlias;
+  public String getMeasurement() {
+    return measurement;
   }
 
-  public void setMeasurementType(MeasurementType measurementType) {
-    this.measurementType = measurementType;
-  }
-
-  public MeasurementType getMeasurementType() {
-    return measurementType;
-  }
-
-  public void setMeasurementDataType(TSDataType measurementDataType) {
-    this.measurementDataType = measurementDataType;
+  public void setMeasurementAlias(String measurementAlias) {
+    this.measurementAlias = measurementAlias;
   }
 
-  public TSDataType getMeasurementDataType() {
-    return measurementDataType;
+  public String getMeasurementAlias() {
+    return measurementAlias;
   }
 
   public void setColumnDataType(TSDataType columnDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index 10afc95..d8dd931 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -126,14 +126,11 @@ public abstract class QueryPlan extends PhysicalPlan {
 
   @Override
   public void setPaths(List<PartialPath> paths) {
-    if (paths == null) this.paths = null; // align by device
-    else {
-      List<MeasurementPath> measurementPaths = new ArrayList<>();
-      for (PartialPath path : paths) {
-        measurementPaths.add((MeasurementPath) path);
-      }
-      this.paths = measurementPaths;
+    List<MeasurementPath> measurementPaths = new ArrayList<>();
+    for (PartialPath path : paths) {
+      measurementPaths.add((MeasurementPath) path);
     }
+    this.paths = measurementPaths;
   }
 
   public List<TSDataType> getDataTypes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 863a586..0f63271 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -128,9 +128,10 @@ public class RawDataQueryPlan extends QueryPlan {
         path -> {
           Set<String> set =
               deviceToMeasurements.computeIfAbsent(path.getDevice(), key -> new HashSet<>());
-          set.add(path.getMeasurement());
           if (path instanceof AlignedPath) {
             set.addAll(((AlignedPath) path).getMeasurementList());
+          } else {
+            set.add(path.getMeasurement());
           }
         });
     this.deduplicatedPaths = deduplicatedPaths;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 620bfae..e138bb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -19,20 +19,17 @@
 package org.apache.iotdb.db.query.dataset;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
 import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
 import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.IQueryRouter;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,8 +45,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 
 /** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
 public class AlignByDeviceDataSet extends QueryDataSet {
@@ -60,39 +55,48 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   private IExpression expression;
 
   private List<String> measurements;
-  private List<PartialPath> devices;
+  private List<PartialPath> paths;
+  private List<String> aggregations;
+  private Map<String, List<Integer>> deviceToPathIndex;
   private Map<String, IExpression> deviceToFilterMap;
-  private Map<String, MeasurementInfo> measurementInfoMap;
 
   private GroupByTimePlan groupByTimePlan;
+  private GroupByTimeFillPlan groupByFillPlan;
   private FillQueryPlan fillQueryPlan;
   private AggregationPlan aggregationPlan;
   private RawDataQueryPlan rawDataQueryPlan;
 
   private boolean curDataSetInitialized;
-  private PartialPath currentDevice;
   private QueryDataSet currentDataSet;
-  private Iterator<PartialPath> deviceIterator;
+  private Iterator<String> deviceIterator;
+  private String currentDevice;
   private List<String> executeColumns;
   private int pathsNum = 0;
 
   public AlignByDeviceDataSet(
       AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
-    super(null, alignByDevicePlan.getDataTypes());
+    super(null, null);
     // align by device's column number is different from other datasets
     // TODO I don't know whether it's right or not in AlignedPath, remember to check here while
     // adapting AlignByDevice query for new vector
-    super.columnNum = alignByDevicePlan.getDataTypes().size();
+    super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device'
     this.measurements = alignByDevicePlan.getMeasurements();
-    this.devices = alignByDevicePlan.getDevices();
-    this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
+    this.paths = alignByDevicePlan.getDeduplicatePaths();
+    this.aggregations = alignByDevicePlan.getAggregations();
     this.queryRouter = queryRouter;
     this.context = context;
+    this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator();
+    this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex();
     this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
 
     switch (alignByDevicePlan.getOperatorType()) {
+      case GROUP_BY_FILL:
+        this.dataSetType = DataSetType.GROUP_BY_FILL;
+        this.groupByFillPlan = alignByDevicePlan.getGroupByFillPlan();
+        this.groupByFillPlan.setAscending(alignByDevicePlan.isAscending());
+        break;
       case GROUP_BY_TIME:
-        this.dataSetType = DataSetType.GROUPBYTIME;
+        this.dataSetType = DataSetType.GROUP_BY_TIME;
         this.groupByTimePlan = alignByDevicePlan.getGroupByTimePlan();
         this.groupByTimePlan.setAscending(alignByDevicePlan.isAscending());
         break;
@@ -115,7 +119,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     this.curDataSetInitialized = false;
-    this.deviceIterator = devices.iterator();
   }
 
   public int getPathsNum() {
@@ -133,37 +136,22 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
     while (deviceIterator.hasNext()) {
       currentDevice = deviceIterator.next();
-      // get all measurements of current device
-      Map<String, MeasurementPath> measurementToPathMap =
-          getMeasurementsUnderGivenDevice(currentDevice);
-      Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
-
-      // extract paths and aggregations queried from all measurements
-      // executeColumns is for calculating rowRecord
       executeColumns = new ArrayList<>();
       List<PartialPath> executePaths = new ArrayList<>();
       List<String> executeAggregations = new ArrayList<>();
-      for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
-        if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
-          continue;
-        }
-        String column = entry.getKey();
-        String measurement = column;
-        if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
-          measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
-          if (measurementOfGivenDevice.contains(measurement)) {
-            executeAggregations.add(column.substring(0, column.indexOf('(')));
-          }
-        }
-        if (measurementOfGivenDevice.contains(measurement)) {
-          executeColumns.add(column);
-          executePaths.add(measurementToPathMap.get(measurement));
+      for (int i : deviceToPathIndex.get(currentDevice)) {
+        executePaths.add(paths.get(i));
+        String executeColumn = paths.get(i).getMeasurement();
+        if (aggregations != null) {
+          executeAggregations.add(aggregations.get(i));
+          executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn);
         }
+        executeColumns.add(executeColumn);
       }
 
       // get filter to execute for the current device
       if (deviceToFilterMap != null) {
-        this.expression = deviceToFilterMap.get(currentDevice.getFullPath());
+        this.expression = deviceToFilterMap.get(currentDevice);
       }
 
       // for tracing: try to calculate the number of series paths
@@ -173,7 +161,13 @@ public class AlignByDeviceDataSet extends QueryDataSet {
 
       try {
         switch (dataSetType) {
-          case GROUPBYTIME:
+          case GROUP_BY_FILL:
+            groupByFillPlan.setDeduplicatedPathsAndUpdate(executePaths);
+            groupByFillPlan.setDeduplicatedAggregations(executeAggregations);
+            groupByFillPlan.setExpression(expression);
+            currentDataSet = queryRouter.groupByFill(groupByFillPlan, context);
+            break;
+          case GROUP_BY_TIME:
             groupByTimePlan.setDeduplicatedPathsAndUpdate(executePaths);
             groupByTimePlan.setDeduplicatedAggregations(executeAggregations);
             groupByTimePlan.setExpression(expression);
@@ -190,6 +184,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
             currentDataSet = queryRouter.fill(fillQueryPlan, context);
             break;
           case QUERY:
+            // Group all the subSensors of one vector into one VectorPartialPath
+            executePaths = MetaUtils.groupAlignedPaths(executePaths);
             rawDataQueryPlan.setDeduplicatedPathsAndUpdate(executePaths);
             rawDataQueryPlan.setExpression(expression);
             currentDataSet = queryRouter.rawDataQuery(rawDataQueryPlan, context);
@@ -219,23 +215,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     return false;
   }
 
-  /** Get all measurements under given device. */
-  protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device)
-      throws IOException {
-    try {
-      // TODO: Implement this method in Cluster MManager
-      Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
-      List<MeasurementPath> measurementPaths =
-          IoTDB.metaManager.getAllMeasurementByDevicePath(device);
-      for (MeasurementPath measurementPath : measurementPaths) {
-        measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath);
-      }
-      return measurementToPathMap;
-    } catch (MetadataException e) {
-      throw new IOException("Cannot get node from " + device, e);
-    }
-  }
-
   @Override
   public RowRecord nextWithoutConstraint() throws IOException {
     RowRecord originRowRecord = currentDataSet.next();
@@ -243,7 +222,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp());
 
     Field deviceField = new Field(TSDataType.TEXT);
-    deviceField.setBinaryV(new Binary(currentDevice.getFullPath()));
+    deviceField.setBinaryV(new Binary(currentDevice));
     rowRecord.addField(deviceField);
     // device field should not be considered as a value field it should affect the WITHOUT NULL
     // judgement
@@ -256,23 +235,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
     }
 
     for (String measurement : measurements) {
-      MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
-      switch (measurementInfo.getMeasurementType()) {
-        case Exist:
-          if (currentColumnMap.get(measurement) != null) {
-            rowRecord.addField(currentColumnMap.get(measurement));
-          } else {
-            rowRecord.addField(new Field(null));
-          }
-          break;
-        case NonExist:
-          rowRecord.addField(new Field(null));
-          break;
-        case Constant:
-          Field res = new Field(TSDataType.TEXT);
-          res.setBinaryV(Binary.valueOf(measurement));
-          rowRecord.addField(res);
-          break;
+      if (currentColumnMap.get(measurement) != null) {
+        rowRecord.addField(currentColumnMap.get(measurement));
+      } else {
+        rowRecord.addField(new Field(null));
       }
     }
 
@@ -280,7 +246,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
   }
 
   private enum DataSetType {
-    GROUPBYTIME,
+    GROUP_BY_FILL,
+    GROUP_BY_TIME,
     AGGREGATE,
     FILL,
     QUERY
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 3df6ed7..6066cdb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -153,7 +153,7 @@ public class SchemaUtils {
   }
 
   public static List<TSDataType> getSeriesTypesByPaths(
-      List<MeasurementPath> paths, List<String> aggregations) throws MetadataException {
+      List<MeasurementPath> paths, List<String> aggregations) {
     List<TSDataType> tsDataTypes = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
       String aggrStr = aggregations != null ? aggregations.get(i) : null;
@@ -168,6 +168,15 @@ public class SchemaUtils {
     return tsDataTypes;
   }
 
+  public static TSDataType getSeriesTypeByPath(MeasurementPath path, String aggregation) {
+    TSDataType dataType = getAggregationType(aggregation);
+    if (dataType != null) {
+      return dataType;
+    } else {
+      return path.getSeriesType();
+    }
+  }
+
   /**
    * @param aggregation aggregation function
    * @return the data type of the aggregation or null if it aggregation is null
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
index b1924a6..ec9cb41 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionVectorABDeviceIT.java
@@ -39,8 +39,6 @@ import java.util.List;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-@Ignore
-// TODO: AlignByDevice
 public class IoTDBSessionVectorABDeviceIT {
   private static final String ROOT_SG1_D1 = "root.sg1.d1";
   private static final String ROOT_SG1_D2 = "root.sg1.d2";
@@ -133,27 +131,14 @@ public class IoTDBSessionVectorABDeviceIT {
       assertEquals("s2", dataSet.getColumnNames().get(3));
 
       long time = 1L;
-      int count = 0;
-      int index = 0;
-      String[] deviceArray = {"root.sg1.d2", "root.sg1.d1"};
+      String device = "root.sg1.d1";
       while (dataSet.hasNext()) {
-        count++;
         RowRecord rowRecord = dataSet.next();
         assertEquals(time, rowRecord.getTimestamp());
-        assertEquals(deviceArray[index], rowRecord.getFields().get(0).getBinaryV().toString());
-        if (index == 1) {
-          assertEquals(time + 1, rowRecord.getFields().get(1).getLongV());
-          assertEquals(time + 2, rowRecord.getFields().get(2).getLongV());
-          assertEquals("null", rowRecord.getFields().get(3).getStringValue());
-        } else {
-          assertEquals(time + 3, rowRecord.getFields().get(1).getLongV());
-          assertEquals(time + 4, rowRecord.getFields().get(2).getLongV());
-          assertEquals(time + 5, rowRecord.getFields().get(3).getLongV());
-        }
-        if (count == 100) {
-          count = 0;
-          index++;
-        }
+        assertEquals(device, rowRecord.getFields().get(0).getBinaryV().toString());
+        assertEquals(time + 1, rowRecord.getFields().get(1).getLongV());
+        assertEquals(time + 2, rowRecord.getFields().get(2).getLongV());
+        assertEquals("null", rowRecord.getFields().get(3).getStringValue());
         dataSet.next();
       }
 
@@ -164,6 +149,8 @@ public class IoTDBSessionVectorABDeviceIT {
     }
   }
 
+  // TODO: remove ignore after implementing aggregation
+  @Ignore
   @Test
   public void vectorAggregationAlignByDeviceTest() {
     try {