You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/06 06:30:18 UTC

[iotdb] branch master updated: [IOTDB-3722] Extend Fill function (#6594)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 744fd52bbf [IOTDB-3722] Extend Fill function (#6594)
744fd52bbf is described below

commit 744fd52bbfa7c1603d8aefd690f6adb990f98806
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Jul 6 14:30:12 2022 +0800

    [IOTDB-3722] Extend Fill function (#6594)
---
 .../iotdb/db/it/query/IoTDBNullValueFillIT.java    | 239 ++++++++-------------
 .../operator/process/LinearFillOperator.java       |  36 +++-
 .../operator/process/fill/ILinearFill.java         |  57 +++++
 .../identity/IdentityFill.java}                    |  20 +-
 .../identity/IdentityLinearFill.java}              |  23 +-
 .../process/fill/linear/DoubleLinearFill.java      |   5 -
 .../process/fill/linear/FloatLinearFill.java       |   5 -
 .../process/fill/linear/IntLinearFill.java         |   5 -
 .../operator/process/fill/linear/LinearFill.java   |  81 +++----
 .../process/fill/linear/LongLinearFill.java        |   5 -
 .../operator/process/merge/AscTimeComparator.java  |   5 -
 .../operator/process/merge/DescTimeComparator.java |   5 -
 .../operator/process/merge/TimeComparator.java     |   5 -
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  31 ---
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  34 +--
 .../execution/operator/LinearFillOperatorTest.java | 152 ++++++++++---
 16 files changed, 360 insertions(+), 348 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 46f4f9e5b7..5437854454 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
-import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
 import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
 import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
 import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
@@ -93,6 +92,20 @@ public class IoTDBNullValueFillIT {
         "insert into root.sg1.d2(time, s2, s4, s6) values(9, 9, 9.0, 't9')"
       };
 
+  private final String[] expectedHeader =
+      new String[] {
+        "Time",
+        "root.sg1.d1.s1",
+        "root.sg1.d1.s2",
+        "root.sg1.d1.s3",
+        "root.sg1.d1.s4",
+        "root.sg1.d1.s5",
+        "root.sg1.d1.s6"
+      };
+
+  private final String[] expectedAlignByDeviceHeader =
+      new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
+
   @BeforeClass
   public static void setUp() throws Exception {
     EnvFactory.getEnv().initBeforeClass();
@@ -106,16 +119,6 @@ public class IoTDBNullValueFillIT {
 
   @Test
   public void previousFillTest() {
-    String[] expectedHeader =
-        new String[] {
-          "Time",
-          "root.sg1.d1.s1",
-          "root.sg1.d1.s2",
-          "root.sg1.d1.s3",
-          "root.sg1.d1.s4",
-          "root.sg1.d1.s5",
-          "root.sg1.d1.s6"
-        };
     String[] retArray =
         new String[] {
           "1,null,1,null,1.0,null,t1,",
@@ -133,16 +136,6 @@ public class IoTDBNullValueFillIT {
 
   @Test
   public void previousDescFillTest() {
-    String[] expectedHeader =
-        new String[] {
-          "Time",
-          "root.sg1.d1.s1",
-          "root.sg1.d1.s2",
-          "root.sg1.d1.s3",
-          "root.sg1.d1.s4",
-          "root.sg1.d1.s5",
-          "root.sg1.d1.s6"
-        };
     String[] retArray =
         new String[] {
           "9,9,null,9.0,null,true,null,",
@@ -162,7 +155,6 @@ public class IoTDBNullValueFillIT {
 
   @Test
   public void previousFillAlignByDeviceTest() {
-    String[] expectedHeader = new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
     String[] retArray =
         new String[] {
           "1,root.sg1.d1,null,1,null,1.0,null,t1,",
@@ -184,13 +176,12 @@ public class IoTDBNullValueFillIT {
         };
     resultSetEqualTest(
         "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) align by device",
-        expectedHeader,
+        expectedAlignByDeviceHeader,
         retArray);
   }
 
   @Test
   public void previousDescFillAlignByDeviceTest() {
-    String[] expectedHeader = new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
     String[] retArray =
         new String[] {
           "9,root.sg1.d1,9,null,9.0,null,true,null,",
@@ -212,181 +203,119 @@ public class IoTDBNullValueFillIT {
         };
     resultSetEqualTest(
         "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by time desc align by device",
-        expectedHeader,
+        expectedAlignByDeviceHeader,
         retArray);
   }
 
   @Test
   public void linearFillTest() {
-    String[] expectedHeader =
-        new String[] {
-          "Time", "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4"
-        };
     String[] retArray =
         new String[] {
-          "1,null,1,null,1.0,",
-          "2,2,2,2.0,2.0,",
-          "3,3,3,3.0,3.0,",
-          "4,4,4,4.0,4.0,",
-          "5,5,5,5.0,5.0,",
-          "6,6,6,6.5,6.0,",
-          "8,8,8,8.0,8.0,",
-          "9,9,null,9.0,null,"
+          "1,null,1,null,1.0,null,t1,",
+          "2,2,2,2.0,2.0,true,t2,",
+          "3,3,3,3.0,3.0,false,null,",
+          "4,4,4,4.0,4.0,null,t4,",
+          "5,5,5,5.0,5.0,false,t5,",
+          "6,6,6,6.5,6.0,null,t6,",
+          "8,8,8,8.0,8.0,true,t8,",
+          "9,9,null,9.0,null,true,null,"
         };
     resultSetEqualWithDescOrderTest(
-        "select s1, s2, s3, s4 from root.sg1.d1 fill(linear)", expectedHeader, retArray);
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(linear)", expectedHeader, retArray);
   }
 
   @Test
   public void linearFillAlignByDeviceTest() {
-    assertTestFail(
-        "select s1, s2, s3, s4 from root.sg1.d1 fill(linear) align by device",
-        "Linear fill is not supported in align by device query yet.");
-  }
-
-  @Test
-  public void linearFillDataTypeMisMatchTest() {
-    assertTestFail(
-        "select s1, s5 from root.sg1.d1 fill(linear)",
-        "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support linear fill.");
-    assertTestFail(
-        "select s1, s6 from root.sg1.d1 fill(linear)",
-        "Data type mismatch: column 'root.sg1.d1.s6' (dataType 'TEXT') doesn't support linear fill.");
+    String[] retArray =
+        new String[] {
+          "1,root.sg1.d1,null,1,null,1.0,null,t1,",
+          "2,root.sg1.d1,2,2,2.0,2.0,true,t2,",
+          "3,root.sg1.d1,3,3,3.0,3.0,false,null,",
+          "4,root.sg1.d1,4,4,4.0,4.0,null,t4,",
+          "5,root.sg1.d1,5,5,5.0,5.0,false,t5,",
+          "6,root.sg1.d1,6,6,6.5,6.0,null,t6,",
+          "8,root.sg1.d1,8,8,8.0,8.0,true,t8,",
+          "9,root.sg1.d1,9,5,9.0,5.0,true,null,",
+          "1,root.sg1.d2,1,5,1.0,5.0,true,null,",
+          "2,root.sg1.d2,2,2,2.0,2.0,true,t2,",
+          "3,root.sg1.d2,3,3,3.0,3.0,null,t3,",
+          "4,root.sg1.d2,4,4,4.0,4.0,false,null,",
+          "5,root.sg1.d2,5,5,5.0,5.0,false,t5,",
+          "6,root.sg1.d2,6,6,6.0,6.5,false,null,",
+          "8,root.sg1.d2,8,8,8.0,8.0,true,t8,",
+          "9,root.sg1.d2,null,9,null,9.0,null,t9,"
+        };
+    resultSetEqualTest(
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(linear) align by device",
+        expectedAlignByDeviceHeader,
+        retArray);
   }
 
   @Test
   public void intFillTest() {
-    String[] expectedHeader =
-        new String[] {
-          "Time",
-          "root.sg1.d1.s1",
-          "root.sg1.d1.s2",
-          "root.sg1.d1.s3",
-          "root.sg1.d1.s4",
-          "root.sg1.d1.s6"
-        };
     String[] retArray =
         new String[] {
-          "1,1000,1,1000.0,1.0,t1,",
-          "2,2,2,2.0,2.0,t2,",
-          "3,3,1000,3.0,1000.0,1000,",
-          "4,1000,4,1000.0,4.0,t4,",
-          "5,5,5,5.0,5.0,t5,",
-          "6,1000,6,1000.0,6.0,t6,",
-          "8,8,8,8.0,8.0,t8,",
-          "9,9,1000,9.0,1000.0,1000,"
+          "1,1000,1,1000.0,1.0,null,t1,",
+          "2,2,2,2.0,2.0,true,t2,",
+          "3,3,1000,3.0,1000.0,false,1000,",
+          "4,1000,4,1000.0,4.0,null,t4,",
+          "5,5,5,5.0,5.0,false,t5,",
+          "6,1000,6,1000.0,6.0,null,t6,",
+          "8,8,8,8.0,8.0,true,t8,",
+          "9,9,1000,9.0,1000.0,true,1000,"
         };
     resultSetEqualWithDescOrderTest(
-        "select s1, s2, s3, s4, s6 from root.sg1.d1 fill(1000)", expectedHeader, retArray);
-  }
-
-  @Test
-  public void intFillDataTypeMisMatchTest() {
-    assertTestFail(
-        "select s1, s5 from root.sg1.d1 fill(1000)",
-        "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with '1000' (dataType 'INT64').");
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(1000)", expectedHeader, retArray);
   }
 
   @Test
   public void floatFillTest() {
-    String[] expectedHeader =
-        new String[] {"Time", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s6"};
     String[] retArray =
         new String[] {
-          "1,3.14,1.0,t1,",
-          "2,2.0,2.0,t2,",
-          "3,3.0,3.14,3.14,",
-          "4,3.14,4.0,t4,",
-          "5,5.0,5.0,t5,",
-          "6,3.14,6.0,t6,",
-          "8,8.0,8.0,t8,",
-          "9,9.0,3.14,3.14,"
+          "1,null,1,3.14,1.0,null,t1,",
+          "2,2,2,2.0,2.0,true,t2,",
+          "3,3,null,3.0,3.14,false,3.14,",
+          "4,null,4,3.14,4.0,null,t4,",
+          "5,5,5,5.0,5.0,false,t5,",
+          "6,null,6,3.14,6.0,null,t6,",
+          "8,8,8,8.0,8.0,true,t8,",
+          "9,9,null,9.0,3.14,true,3.14,"
         };
     resultSetEqualWithDescOrderTest(
-        "select s3, s4, s6 from root.sg1.d1 fill(3.14)", expectedHeader, retArray);
-  }
-
-  @Test
-  public void floatFillDataTypeMisMatchTest() {
-    assertTestFail(
-        "select s1, s3 from root.sg1.d1 fill(3.14)",
-        "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with '3.14' (dataType 'DOUBLE').");
-    assertTestFail(
-        "select s2, s3 from root.sg1.d1 fill(3.14)",
-        "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with '3.14' (dataType 'DOUBLE').");
-    assertTestFail(
-        "select s5, s3 from root.sg1.d1 fill(3.14)",
-        "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with '3.14' (dataType 'DOUBLE').");
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(3.14)", expectedHeader, retArray);
   }
 
   @Test
   public void booleanFillTest() {
-    String[] expectedHeader = new String[] {"Time", "root.sg1.d1.s5", "root.sg1.d1.s6"};
     String[] retArray =
         new String[] {
-          "1,true,t1,",
-          "2,true,t2,",
-          "3,false,true,",
-          "4,true,t4,",
-          "5,false,t5,",
-          "6,true,t6,",
-          "8,true,t8,",
-          "9,true,true,"
+          "1,null,1,null,1.0,true,t1,",
+          "2,2,2,2.0,2.0,true,t2,",
+          "3,3,null,3.0,null,false,true,",
+          "4,null,4,null,4.0,true,t4,",
+          "5,5,5,5.0,5.0,false,t5,",
+          "6,null,6,null,6.0,true,t6,",
+          "8,8,8,8.0,8.0,true,t8,",
+          "9,9,null,9.0,null,true,true,"
         };
     resultSetEqualWithDescOrderTest(
-        "select s5, s6 from root.sg1.d1 fill(true)", expectedHeader, retArray);
-  }
-
-  @Test
-  public void booleanFillDataTypeMisMatchTest() {
-    assertTestFail(
-        "select s5, s1 from root.sg1.d1 fill(true)",
-        "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with 'true' (dataType 'BOOLEAN').");
-    assertTestFail(
-        "select s5, s2 from root.sg1.d1 fill(true)",
-        "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with 'true' (dataType 'BOOLEAN').");
-    assertTestFail(
-        "select s5, s3 from root.sg1.d1 fill(true)",
-        "Data type mismatch: column 'root.sg1.d1.s3' (dataType 'FLOAT') doesn't support fill with 'true' (dataType 'BOOLEAN').");
-    assertTestFail(
-        "select s5, s4 from root.sg1.d1 fill(true)",
-        "Data type mismatch: column 'root.sg1.d1.s4' (dataType 'DOUBLE') doesn't support fill with 'true' (dataType 'BOOLEAN').");
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(true)", expectedHeader, retArray);
   }
 
   @Test
   public void textFillTest() {
-    String[] expectedHeader = new String[] {"Time", "root.sg1.d1.s6", "root.sg1.d2.s6"};
     String[] retArray =
         new String[] {
-          "1,t1,t0,",
-          "2,t2,t2,",
-          "3,t0,t3,",
-          "4,t4,t0,",
-          "5,t5,t5,",
-          "6,t6,t0,",
-          "8,t8,t8,",
-          "9,t0,t9,"
+          "1,null,1,null,1.0,null,t1,",
+          "2,2,2,2.0,2.0,true,t2,",
+          "3,3,null,3.0,null,false,t0,",
+          "4,null,4,null,4.0,null,t4,",
+          "5,5,5,5.0,5.0,false,t5,",
+          "6,null,6,null,6.0,null,t6,",
+          "8,8,8,8.0,8.0,true,t8,",
+          "9,9,null,9.0,null,true,t0,"
         };
     resultSetEqualWithDescOrderTest(
-        "select s6 from root.sg1.d1, root.sg1.d2 fill('t0')", expectedHeader, retArray);
-  }
-
-  @Test
-  public void textFillDataTypeMisMatchTest() {
-    assertTestFail(
-        "select s6, s1 from root.sg1.d1 fill('t0')",
-        "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with 't0' (dataType 'TEXT').");
-    assertTestFail(
-        "select s6, s2 from root.sg1.d1 fill('t0')",
-        "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with 't0' (dataType 'TEXT').");
-    assertTestFail(
-        "select s6, s3 from root.sg1.d1 fill('t0')",
-        "Data type mismatch: column 'root.sg1.d1.s3' (dataType 'FLOAT') doesn't support fill with 't0' (dataType 'TEXT').");
-    assertTestFail(
-        "select s6, s4 from root.sg1.d1 fill('t0')",
-        "Data type mismatch: column 'root.sg1.d1.s4' (dataType 'DOUBLE') doesn't support fill with 't0' (dataType 'TEXT').");
-    assertTestFail(
-        "select s6, s5 from root.sg1.d1 fill('t0')",
-        "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with 't0' (dataType 'TEXT').");
+        "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill('t0')", expectedHeader, retArray);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 8193e20684..fcae23ce7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -37,11 +37,15 @@ import static java.util.Objects.requireNonNull;
 public class LinearFillOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
-  private final LinearFill[] fillArray;
+  private final ILinearFill[] fillArray;
   private final Operator child;
   private final int outputColumnCount;
   // TODO need to spill it to disk if it consumes too much memory
   private final List<TsBlock> cachedTsBlock;
+
+  private final List<Long> cachedRowIndex;
+
+  private long currentRowIndex = 0;
   // next TsBlock Index for each Column
   private final int[] nextTsBlockIndex;
 
@@ -52,7 +56,7 @@ public class LinearFillOperator implements ProcessOperator {
   private boolean noMoreTsBlock;
 
   public LinearFillOperator(
-      OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
+      OperatorContext operatorContext, ILinearFill[] fillArray, Operator child) {
     this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
     checkArgument(
         fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
@@ -60,6 +64,7 @@ public class LinearFillOperator implements ProcessOperator {
     this.child = requireNonNull(child, "child operator is null");
     this.outputColumnCount = fillArray.length;
     this.cachedTsBlock = new ArrayList<>();
+    this.cachedRowIndex = new ArrayList<>();
     this.nextTsBlockIndex = new int[outputColumnCount];
     Arrays.fill(this.nextTsBlockIndex, 1);
     this.canCallNext = false;
@@ -88,19 +93,21 @@ public class LinearFillOperator implements ProcessOperator {
         return nextTsBlock;
       } else { // otherwise, we cache it
         cachedTsBlock.add(nextTsBlock);
+        cachedRowIndex.add(currentRowIndex);
+        currentRowIndex += nextTsBlock.getPositionCount();
       }
     }
 
     TsBlock originTsBlock = cachedTsBlock.get(0);
-    long currentEndTime = originTsBlock.getEndTime();
+    long currentEndRowIndex = cachedRowIndex.get(0) + originTsBlock.getPositionCount() - 1;
     // Step 1: judge whether we can fill current TsBlock, if TsBlock that we can get is not enough,
     // we just return null
     for (int columnIndex = 0; columnIndex < outputColumnCount; columnIndex++) {
       // current valueColumn can't be filled using current information
       if (fillArray[columnIndex].needPrepareForNext(
-          currentEndTime, originTsBlock.getColumn(columnIndex))) {
+          currentEndRowIndex, originTsBlock.getColumn(columnIndex))) {
         // current cached TsBlock is not enough to fill this column
-        while (!isCachedTsBlockEnough(columnIndex, currentEndTime)) {
+        while (!isCachedTsBlockEnough(columnIndex, currentEndRowIndex)) {
           // if we failed to get next TsBlock
           if (!tryToGetNextTsBlock()) {
             // there is no more TsBlock, so we have to fill this Column
@@ -118,9 +125,12 @@ public class LinearFillOperator implements ProcessOperator {
     }
     // Step 2: fill current TsBlock
     originTsBlock = cachedTsBlock.remove(0);
+    long startRowIndex = cachedRowIndex.remove(0);
     Column[] columns = new Column[outputColumnCount];
     for (int i = 0; i < outputColumnCount; i++) {
-      columns[i] = fillArray[i].fill(originTsBlock.getTimeColumn(), originTsBlock.getColumn(i));
+      columns[i] =
+          fillArray[i].fill(
+              originTsBlock.getTimeColumn(), originTsBlock.getColumn(i), startRowIndex);
     }
     TsBlock result =
         new TsBlock(originTsBlock.getPositionCount(), originTsBlock.getTimeColumn(), columns);
@@ -154,17 +164,21 @@ public class LinearFillOperator implements ProcessOperator {
    * Judge whether we can use current cached TsBlock to fill Column
    *
    * @param columnIndex index for column which need to be filled
-   * @param currentEndTime endTime of column which need to be filled
+   * @param currentEndRowIndex row index for endTime of column which need to be filled
    * @return true if current cached TsBlock is enough to fill Column at columnIndex, otherwise
    *     false.
    */
-  private boolean isCachedTsBlockEnough(int columnIndex, long currentEndTime) {
+  private boolean isCachedTsBlockEnough(int columnIndex, long currentEndRowIndex) {
     // next TsBlock has already been in the cachedTsBlock
     while (nextTsBlockIndex[columnIndex] < cachedTsBlock.size()) {
       TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[columnIndex]);
+      long startRowIndex = cachedRowIndex.get(nextTsBlockIndex[columnIndex]);
       nextTsBlockIndex[columnIndex]++;
       if (fillArray[columnIndex].prepareForNext(
-          currentEndTime, nextTsBlock.getTimeColumn(), nextTsBlock.getColumn(columnIndex))) {
+          startRowIndex,
+          currentEndRowIndex,
+          nextTsBlock.getTimeColumn(),
+          nextTsBlock.getColumn(columnIndex))) {
         return true;
       }
     }
@@ -184,6 +198,8 @@ public class LinearFillOperator implements ProcessOperator {
         return false;
       } else { // otherwise, we cache it
         cachedTsBlock.add(nextTsBlock);
+        cachedRowIndex.add(currentRowIndex);
+        currentRowIndex += nextTsBlock.getPositionCount();
         return true;
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
new file mode 100644
index 0000000000..d0edfc5961
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public interface ILinearFill {
+
+  /**
+   * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
+   * has been set to true
+   *
+   * @param timeColumn TimeColumn of valueColumn
+   * @param valueColumn valueColumn that need to be filled
+   * @param currentRowIndex current row index for start time in timeColumn
+   * @return Value Column that has been filled
+   */
+  Column fill(TimeColumn timeColumn, Column valueColumn, long currentRowIndex);
+
+  /**
+   * @param rowIndex row index for end time of current valueColumn that need to be filled
+   * @param valueColumn valueColumn that need to be filled
+   * @return true if valueColumn can't be filled using current information, and we need to get next
+   *     TsBlock and then call prepareForNext. false if valueColumn can be filled using current
+   *     information, and we can directly call fill() function
+   */
+  boolean needPrepareForNext(long rowIndex, Column valueColumn);
+
+  /**
+   * @param startRowIndex row index for start time of nextValueColumn
+   * @param endRowIndex row index for end time of current valueColumn that need to be filled
+   * @param nextTimeColumn TimeColumn of next TsBlock
+   * @param nextValueColumn Value Column of next TsBlock
+   * @return true if we get enough information to fill current column, and we can stop getting next
+   *     TsBlock and calling prepareForNext. false if we still don't get enough information to fill
+   *     current column, and still need to keep getting next TsBlock and then call prepareForNext
+   */
+  boolean prepareForNext(
+      long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
index 4aa723ad97..25a5264bb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
@@ -16,23 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
 
-public class AscTimeComparator implements TimeComparator {
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-  /** @return if order by time asc, return true if time <= endTime, otherwise false */
-  @Override
-  public boolean satisfyCurEndTime(long time, long endTime) {
-    return time <= endTime;
-  }
-
-  @Override
-  public long getCurrentEndTime(long time1, long time2) {
-    return Math.min(time1, time2);
-  }
+public class IdentityFill implements IFill {
 
   @Override
-  public boolean inFillBound(long time, long timeBound) {
-    return time < timeBound;
+  public Column fill(Column valueColumn) {
+    return valueColumn;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
index 4aa723ad97..34f329baf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
@@ -16,23 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
 
-public class AscTimeComparator implements TimeComparator {
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class IdentityLinearFill implements ILinearFill {
 
-  /** @return if order by time asc, return true if time <= endTime, otherwise false */
   @Override
-  public boolean satisfyCurEndTime(long time, long endTime) {
-    return time <= endTime;
+  public Column fill(TimeColumn timeColumn, Column valueColumn, long currentRowIndex) {
+    return valueColumn;
   }
 
   @Override
-  public long getCurrentEndTime(long time1, long time2) {
-    return Math.min(time1, time2);
+  public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
+    return false;
   }
 
   @Override
-  public boolean inFillBound(long time, long timeBound) {
-    return time < timeBound;
+  public boolean prepareForNext(
+      long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn) {
+    throw new IllegalArgumentException(
+        "We won't call prepareForNext in IdentityLinearFill, because needPrepareForNext() method will always return false.");
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
index ca8f558647..52a228b1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -34,10 +33,6 @@ public class DoubleLinearFill extends LinearFill {
 
   private double nextValueInCurrentColumn;
 
-  public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) {
-    super(ascending, timeComparator);
-  }
-
   @Override
   void fillValue(Column column, int index, Object array) {
     ((double[]) array)[index] = column.getDouble(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
index 803c55ec8f..a32c60d61e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -34,10 +33,6 @@ public class FloatLinearFill extends LinearFill {
 
   private float nextValueInCurrentColumn;
 
-  public FloatLinearFill(boolean ascending, TimeComparator timeComparator) {
-    super(ascending, timeComparator);
-  }
-
   @Override
   void fillValue(Column column, int index, Object array) {
     ((float[]) array)[index] = column.getFloat(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
index 4876ed23b8..baf4806551 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -34,10 +33,6 @@ public class IntLinearFill extends LinearFill {
 
   private int nextValueInCurrentColumn;
 
-  public IntLinearFill(boolean ascending, TimeComparator timeComparator) {
-    super(ascending, timeComparator);
-  }
-
   @Override
   void fillValue(Column column, int index, Object array) {
     ((int[]) array)[index] = column.getInt(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index 92685b9c38..238fb91137 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -31,32 +31,18 @@ import static com.google.common.base.Preconditions.checkArgument;
  * the closest timestamp after T. Linear Fill function calculation only supports numeric types
  * including long, int, double and float.
  */
-public abstract class LinearFill {
-
-  private final TimeComparator timeComparator;
+public abstract class LinearFill implements ILinearFill {
 
   // whether previous value is null
   protected boolean previousIsNull = true;
-  // time of next value
-  protected long nextTime;
-
-  protected long nextTimeInCurrentColumn;
 
-  public LinearFill(boolean ascending, TimeComparator timeComparator) {
-    this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
-    this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
-    this.timeComparator = timeComparator;
-  }
+  // next row index which is not null
+  private long nextRowIndex = -1;
+  // next row index in current column which is not null
+  private long nextRowIndexInCurrentColumn = -1;
 
-  /**
-   * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
-   * has been set to true
-   *
-   * @param timeColumn TimeColumn of valueColumn
-   * @param valueColumn valueColumn that need to be filled
-   * @return Value Column that has been filled
-   */
-  public Column fill(TimeColumn timeColumn, Column valueColumn) {
+  @Override
+  public Column fill(TimeColumn timeColumn, Column valueColumn, long startRowIndex) {
     int size = valueColumn.getPositionCount();
     // if this valueColumn is empty, just return itself;
     if (size == 0) {
@@ -74,11 +60,13 @@ public abstract class LinearFill {
     // if its values are all null
     if (valueColumn instanceof RunLengthEncodedColumn) {
       // previous value is null or next value is null, we just return NULL_VALUE_BLOCK
-      if (previousIsNull || timeComparator.inFillBound(nextTime, timeColumn.getStartTime())) {
+      if (previousIsNull || nextRowIndex < startRowIndex) {
         return new RunLengthEncodedColumn(createNullValueColumn(), size);
       } else {
         prepareForNextValueInCurrentColumn(
-            timeColumn.getEndTime(), timeColumn.getPositionCount() - 1, timeColumn, valueColumn);
+            startRowIndex + timeColumn.getPositionCount() - 1,
+            timeColumn.getPositionCount(),
+            valueColumn);
         return new RunLengthEncodedColumn(createFilledValueColumn(), size);
       }
     } else {
@@ -90,10 +78,10 @@ public abstract class LinearFill {
       for (int i = 0; i < size; i++) {
         // current value is null, we need to fill it
         if (valueColumn.isNull(i)) {
-          long currentTime = timeColumn.getLong(i);
-          prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn, valueColumn);
+          long currentRowIndex = startRowIndex + i;
+          prepareForNextValueInCurrentColumn(currentRowIndex, i + 1, valueColumn);
           // we don't fill it, if either previous value or next value is null
-          if (previousIsNull || nextIsNull(currentTime)) {
+          if (previousIsNull || nextIsNull(currentRowIndex)) {
             isNull[i] = true;
             hasNullValue = true;
           } else {
@@ -113,63 +101,56 @@ public abstract class LinearFill {
   }
 
   /**
-   * @param time end time of current valueColumn that need to be filled
+   * @param rowIndex end time of current valueColumn that need to be filled
    * @param valueColumn valueColumn that need to be filled
    * @return true if valueColumn can't be filled using current information, and we need to get next
    *     TsBlock and then call prepareForNext. false if valueColumn can be filled using current
    *     information, and we can directly call fill() function
    */
-  public boolean needPrepareForNext(long time, Column valueColumn) {
-    return timeComparator.inFillBound(nextTime, time)
-        && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+  @Override
+  public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
+    return nextRowIndex < rowIndex && valueColumn.isNull(valueColumn.getPositionCount() - 1);
   }
 
-  /**
-   * @param time end time of current valueColumn that need to be filled
-   * @param nextTimeColumn TimeColumn of next TsBlock
-   * @param nextValueColumn Value Column of next TsBlock
-   * @return true if we get enough information to fill current column, and we can stop getting next
-   *     TsBlock and calling prepareForNext. false if we still don't get enough information to fill
-   *     current column, and still need to keep getting next TsBlock and then call prepareForNext
-   */
-  public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+  @Override
+  public boolean prepareForNext(
+      long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn) {
     checkArgument(
-        nextTimeColumn.getPositionCount() > 0
-            && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)),
+        nextTimeColumn.getPositionCount() > 0 && endRowIndex < startRowIndex,
         "nextColumn's time should be greater than current time");
-    if (timeComparator.satisfyCurEndTime(time, nextTime)) {
+    if (endRowIndex <= nextRowIndex) {
       return true;
     }
 
     for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
       if (!nextValueColumn.isNull(i)) {
         updateNextValue(nextValueColumn, i);
-        this.nextTime = nextTimeColumn.getLong(i);
+        this.nextRowIndex = startRowIndex + i;
         return true;
       }
     }
     return false;
   }
 
-  private boolean nextIsNull(long time) {
-    return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time);
+  private boolean nextIsNull(long rowIndex) {
+    return nextRowIndexInCurrentColumn <= rowIndex;
   }
 
   private void prepareForNextValueInCurrentColumn(
-      long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
-    if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) {
+      long currentRowIndex, int startIndex, Column valueColumn) {
+    if (currentRowIndex <= nextRowIndexInCurrentColumn) {
       return;
     }
     for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
-        this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+        this.nextRowIndexInCurrentColumn = currentRowIndex + (i - startIndex + 1);
         updateNextValueInCurrentColumn(valueColumn, i);
         return;
       }
     }
 
     // current column's value is not enough for filling, we should use value of next Column
-    this.nextTimeInCurrentColumn = this.nextTime;
+    this.nextRowIndexInCurrentColumn = this.nextRowIndex;
     updateNextValueInCurrentColumn();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
index 5f6986ad2e..04dba1613a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -34,10 +33,6 @@ public class LongLinearFill extends LinearFill {
 
   private long nextValueInCurrentColumn;
 
-  public LongLinearFill(boolean ascending, TimeComparator timeComparator) {
-    super(ascending, timeComparator);
-  }
-
   @Override
   void fillValue(Column column, int index, Object array) {
     ((long[]) array)[index] = column.getLong(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 4aa723ad97..95b7316844 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -30,9 +30,4 @@ public class AscTimeComparator implements TimeComparator {
   public long getCurrentEndTime(long time1, long time2) {
     return Math.min(time1, time2);
   }
-
-  @Override
-  public boolean inFillBound(long time, long timeBound) {
-    return time < timeBound;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index 495cd2852a..f53c97d8fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -30,9 +30,4 @@ public class DescTimeComparator implements TimeComparator {
   public long getCurrentEndTime(long time1, long time2) {
     return Math.max(time1, time2);
   }
-
-  @Override
-  public boolean inFillBound(long time, long timeBound) {
-    return time > timeBound;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index a2f2c076da..db017f6186 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -25,9 +25,4 @@ public interface TimeComparator {
 
   /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */
   long getCurrentEndTime(long time1, long time2);
-
-  /**
-   * @return true if time < timeBound && order by time asc, time > timeBound && order by time desc
-   */
-  boolean inFillBound(long time, long timeBound);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 2af7e16daa..8a9a063865 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -412,36 +411,6 @@ public class Analyzer {
           FillComponent fillComponent = queryStatement.getFillComponent();
           List<Expression> fillColumnList =
               outputExpressions.stream().map(Pair::getLeft).distinct().collect(Collectors.toList());
-          if (fillComponent.getFillPolicy() == FillPolicy.VALUE) {
-            for (Expression fillColumn : fillColumnList) {
-              TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString());
-              if (!fillComponent.getFillValue().isDataTypeConsistency(checkedDataType)) {
-                throw new SemanticException(
-                    String.format(
-                        "Data type mismatch: column '%s' (dataType '%s') doesn't support fill with '%s' (dataType '%s').",
-                        fillColumn.getExpressionString(),
-                        checkedDataType,
-                        fillComponent.getFillValue().getBinary(),
-                        fillComponent.getFillValue().getDataTypeString()));
-              }
-            }
-          } else if (fillComponent.getFillPolicy() == FillPolicy.LINEAR) {
-            // TODO support linear fill in align by device query
-            if (queryStatement.isAlignByDevice()) {
-              throw new SemanticException(
-                  "Linear fill is not supported in align by device query yet.");
-            }
-
-            for (Expression fillColumn : fillColumnList) {
-              TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString());
-              if (!checkedDataType.isNumeric()) {
-                throw new SemanticException(
-                    String.format(
-                        "Data type mismatch: column '%s' (dataType '%s') doesn't support linear fill.",
-                        fillColumn.getExpressionString(), checkedDataType));
-              }
-            }
-          }
           analysis.setFillDescriptor(
               new FillDescriptor(fillComponent.getFillPolicy(), fillComponent.getFillValue()));
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index ce8afcf154..343c328eea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -59,16 +59,18 @@ import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill;
-import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill;
@@ -180,6 +182,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
@@ -197,6 +200,10 @@ public class LocalExecutionPlanner {
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
 
+  private static final IdentityFill IDENTITY_FILL = new IdentityFill();
+
+  private static final IdentityLinearFill IDENTITY_LINEAR_FILL = new IdentityLinearFill();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -664,8 +671,7 @@ public class LocalExecutionPlanner {
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   LinearFillOperator.class.getSimpleName()),
-              getLinearFill(
-                  inputColumns, inputDataTypes, node.getScanOrder() == OrderBy.TIMESTAMP_ASC),
+              getLinearFill(inputColumns, inputDataTypes),
               child);
         default:
           throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy);
@@ -676,6 +682,10 @@ public class LocalExecutionPlanner {
         int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
       IFill[] constantFill = new IFill[inputColumns];
       for (int i = 0; i < inputColumns; i++) {
+        if (!literal.isDataTypeConsistency(inputDataTypes.get(i))) {
+          constantFill[i] = IDENTITY_FILL;
+          continue;
+        }
         switch (inputDataTypes.get(i)) {
           case BOOLEAN:
             constantFill[i] = new BooleanConstantFill(literal.getBoolean());
@@ -731,28 +741,26 @@ public class LocalExecutionPlanner {
       return previousFill;
     }
 
-    private LinearFill[] getLinearFill(
-        int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
-      LinearFill[] linearFill = new LinearFill[inputColumns];
-      TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
+    private ILinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) {
+      ILinearFill[] linearFill = new ILinearFill[inputColumns];
       for (int i = 0; i < inputColumns; i++) {
         switch (inputDataTypes.get(i)) {
           case INT32:
-            linearFill[i] = new IntLinearFill(ascending, timeComparator);
+            linearFill[i] = new IntLinearFill();
             break;
           case INT64:
-            linearFill[i] = new LongLinearFill(ascending, timeComparator);
+            linearFill[i] = new LongLinearFill();
             break;
           case FLOAT:
-            linearFill[i] = new FloatLinearFill(ascending, timeComparator);
+            linearFill[i] = new FloatLinearFill();
             break;
           case DOUBLE:
-            linearFill[i] = new DoubleLinearFill(ascending, timeComparator);
+            linearFill[i] = new DoubleLinearFill();
             break;
           case BOOLEAN:
           case TEXT:
-            throw new UnsupportedOperationException(
-                "DataType: " + inputDataTypes.get(i) + " doesn't support linear fill.");
+            linearFill[i] = IDENTITY_LINEAR_FILL;
+            break;
           default:
             throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 9852707056..56c2fe6a1a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -25,11 +25,10 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -42,6 +41,7 @@ import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class LinearFillOperatorTest {
@@ -62,14 +62,12 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      boolean ascending = true;
-      TimeComparator timeComparator = new AscTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator)
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -262,14 +260,12 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      boolean ascending = false;
-      TimeComparator timeComparator = new DescTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator)
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -462,14 +458,12 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      boolean ascending = true;
-      TimeComparator timeComparator = new AscTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator)
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -662,14 +656,12 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      boolean ascending = false;
-      TimeComparator timeComparator = new DescTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator),
-            new FloatLinearFill(ascending, timeComparator)
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -862,8 +854,7 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      LinearFill[] fillArray =
-          new LinearFill[] {new FloatLinearFill(true, new AscTimeComparator())};
+      LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
       LinearFillOperator fillOperator =
           new LinearFillOperator(
               fragmentInstanceContext.getOperatorContexts().get(0),
@@ -968,8 +959,7 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      LinearFill[] fillArray =
-          new LinearFill[] {new FloatLinearFill(false, new DescTimeComparator())};
+      LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
       LinearFillOperator fillOperator =
           new LinearFillOperator(
               fragmentInstanceContext.getOperatorContexts().get(0),
@@ -1057,4 +1047,104 @@ public class LinearFillOperatorTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void batchLinearFillBooleanTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      ILinearFill[] fillArray = new ILinearFill[] {new IdentityLinearFill()};
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final boolean[][][] value =
+                    new boolean[][][] {
+                      {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}}
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                      {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}}
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN));
+                  for (int i = 0; i < 1; i++) {
+                    builder.getTimeColumnBuilder().writeLong(i + index);
+                    for (int j = 0; j < 1; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        builder.getColumnBuilder(j).writeBoolean(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 7;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 7;
+                }
+              });
+
+      int count = 0;
+      boolean[][][] res =
+          new boolean[][][] {
+            {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}}
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}}
+          };
+
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertNotNull(block);
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + count;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 1; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getBoolean(i));
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(res.length, count);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }