You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/08 04:39:14 UTC

[iotdb] branch FillOperator updated: done

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/FillOperator by this push:
     new 72dd5b03fd done
72dd5b03fd is described below

commit 72dd5b03fd7fb304da9a5a38213a2a262148ae3d
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun May 8 12:38:44 2022 +0800

    done
---
 .../operator/process/LinearFillOperator.java       |  84 ++--
 .../operator/process/fill/linear/LinearFill.java   |  14 +-
 .../mpp/execution/operator/FillOperatorTest.java   | 353 +++++++++++++++++
 .../execution/operator/LinearFillOperatorTest.java | 441 +++++++++++++++++++++
 4 files changed, 857 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 3d124189f7..f2f713f174 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -26,8 +26,9 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -40,10 +41,19 @@ public class LinearFillOperator implements ProcessOperator {
   private final Operator child;
   private final int outputColumnCount;
   // TODO need to spill it to disk if it consumes too much memory
-  private final LinkedList<TsBlock> cachedTsBlock;
+  private final List<TsBlock> cachedTsBlock;
+  // next TsBlock Index for each Column
+  private final int[] nextTsBlockIndex;
+  // cache already filled column in last unfinished filling
   private final Column[] cachedFilledValueColumns;
 
+  // indicate which column we are filling
   private int currentFilledColumnIndex;
+  // indicate whether we can call child.next()
+  // it's used to make sure that child.next() will only be called once in LinearFillOperator.next();
+  private boolean canCallNext;
+  // indicate whether there is more TsBlock for child operator
+  private boolean noMoreTsBlock;
 
   public LinearFillOperator(
       OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
@@ -53,9 +63,13 @@ public class LinearFillOperator implements ProcessOperator {
     this.fillArray = fillArray;
     this.child = requireNonNull(child, "child operator is null");
     this.outputColumnCount = fillArray.length;
-    this.cachedTsBlock = new LinkedList<>();
+    this.cachedTsBlock = new ArrayList<>();
+    this.nextTsBlockIndex = new int[outputColumnCount];
+    Arrays.fill(this.nextTsBlockIndex, 1);
     this.cachedFilledValueColumns = new Column[outputColumnCount];
     this.currentFilledColumnIndex = 0;
+    this.canCallNext = false;
+    this.noMoreTsBlock = true;
   }
 
   @Override
@@ -72,49 +86,64 @@ public class LinearFillOperator implements ProcessOperator {
   public TsBlock next() {
 
     // make sure we call child.next() at most once
-    boolean alreadyCallNext = false;
     if (cachedTsBlock.isEmpty()) {
-      alreadyCallNext = true;
+      canCallNext = false;
       TsBlock nextBlock = child.next();
       // child operator's calculation is not finished, so we just return null
       if (nextBlock == null || nextBlock.isEmpty()) {
         return nextBlock;
       } else { // otherwise, we cache it
-        cachedTsBlock.addLast(nextBlock);
+        cachedTsBlock.add(nextBlock);
       }
     }
 
-    TsBlock block = cachedTsBlock.getFirst();
+    TsBlock block = cachedTsBlock.get(0);
     long currentEndTime = block.getEndTime();
     // use cached TsBlock to keep filling remaining column
     while (currentFilledColumnIndex < outputColumnCount) {
+      // current valueColumn can't be filled using current information
       if (fillArray[currentFilledColumnIndex].needPrepareForNext(
           currentEndTime, block.getColumn(currentFilledColumnIndex))) {
-        if (!alreadyCallNext) {
-          alreadyCallNext = true;
+
+        if (canCallNext) { // if we can call child.next(), we call that and cache it in
+          // cachedTsBlock
+          canCallNext = false;
           TsBlock nextBlock = child.next();
           // child operator's calculation is not finished, so we just return null
           if (nextBlock == null || nextBlock.isEmpty()) {
             return nextBlock;
           } else { // otherwise, we cache it
-            cachedTsBlock.addLast(nextBlock);
+            cachedTsBlock.add(nextBlock);
           }
+        }
+
+        // next TsBlock has already been in the cachedTsBlock
+        while (nextTsBlockIndex[currentFilledColumnIndex] < cachedTsBlock.size()) {
+          TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[currentFilledColumnIndex]);
+          nextTsBlockIndex[currentFilledColumnIndex]++;
           if (fillArray[currentFilledColumnIndex].prepareForNext(
               currentEndTime,
-              cachedTsBlock.getLast().getTimeColumn(),
-              cachedTsBlock.getLast().getColumn(currentFilledColumnIndex))) {
+              nextTsBlock.getTimeColumn(),
+              nextTsBlock.getColumn(currentFilledColumnIndex))) {
+            cachedFilledValueColumns[currentFilledColumnIndex] =
+                fillArray[currentFilledColumnIndex].fill(
+                    block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+            break;
+          }
+        }
+
+        // current column's filling is not finished using current owning TsBlock
+        if (cachedFilledValueColumns[currentFilledColumnIndex] == null) {
+          if (noMoreTsBlock) { // there is no more TsBlock, just use current owing TsBlock to fill
             cachedFilledValueColumns[currentFilledColumnIndex] =
                 fillArray[currentFilledColumnIndex].fill(
                     block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
-          } else { // more TsBlocks is needed to do current column's fill, so current calculation is
-            // not finished, and we just return null
+          } else { // next TsBlock is not ready, more TsBlocks is needed to do current column's
+            // fill, so current calculation is not finished, and we just return null
             return null;
           }
-        } else { // more TsBlocks is needed to do current column's fill, so current calculation is
-          // not finished, and we just return null
-          return null;
         }
-      } else {
+      } else { // current valueColumn can be filled using current information
         cachedFilledValueColumns[currentFilledColumnIndex] =
             fillArray[currentFilledColumnIndex].fill(
                 block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
@@ -122,29 +151,30 @@ public class LinearFillOperator implements ProcessOperator {
       currentFilledColumnIndex++;
     }
 
-    TsBlock originTsBlock = cachedTsBlock.removeFirst();
+    TsBlock originTsBlock = cachedTsBlock.remove(0);
     checkArgument(
         outputColumnCount == originTsBlock.getValueColumnCount(),
         "outputColumnCount is not equal to value column count of child operator's TsBlock");
     TsBlock result =
-        TsBlock.wrapBlocksWithoutCopy(
+        new TsBlock(
             originTsBlock.getPositionCount(),
             originTsBlock.getTimeColumn(),
             cachedFilledValueColumns);
     Arrays.fill(cachedFilledValueColumns, null);
+    for (int i = 0; i < outputColumnCount; i++) {
+      nextTsBlockIndex[i]--;
+    }
     currentFilledColumnIndex = 0;
     return result;
   }
 
   @Override
   public boolean hasNext() {
-    boolean hasNext = child.hasNext();
-    if (!hasNext) {
-      for (LinearFill linearFill : fillArray) {
-        linearFill.setNoMoreData();
-      }
-    }
-    return !cachedTsBlock.isEmpty() || hasNext;
+    // if child.hasNext() return false, it means that there is no more tsBlocks
+    noMoreTsBlock = !child.hasNext();
+    // if there is more tsBlock, we can call child.next() once
+    canCallNext = !noMoreTsBlock;
+    return !cachedTsBlock.isEmpty() || !noMoreTsBlock;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index d06a679d2d..f7c3576ec4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public abstract class LinearFill {
 
   // whether previous value is null
@@ -30,8 +32,6 @@ public abstract class LinearFill {
   protected long nextTime = Long.MIN_VALUE;
 
   protected long nextTimeInCurrentColumn;
-  // whether next value is null
-  protected boolean noMoreNext = false;
 
   /**
    * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
@@ -102,7 +102,7 @@ public abstract class LinearFill {
    *     information, and we can directly call fill() function
    */
   public boolean needPrepareForNext(long time, Column valueColumn) {
-    return !noMoreNext && time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+    return time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
   }
 
   /**
@@ -114,6 +114,9 @@ public abstract class LinearFill {
    *     current column, and still need to keep getting next TsBlock and then call prepareForNext
    */
   public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+    checkArgument(
+        nextTimeColumn.getPositionCount() > 0 && nextTimeColumn.getLong(0) > time,
+        "nextColumn's time should be greater than current time");
     if (time <= nextTime) {
       return true;
     }
@@ -128,11 +131,6 @@ public abstract class LinearFill {
     return false;
   }
 
-  /** no more next column */
-  public void setNoMoreData() {
-    this.noMoreNext = true;
-  }
-
   private boolean nextIsNull(long time) {
     return nextTimeInCurrentColumn <= time;
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
new file mode 100644
index 0000000000..5cbe7b189f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class FillOperatorTest {
+
+  @Test
+  public void batchConstantFillTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, FillOperator.class.getSimpleName());
+
+      IFill[] fillArray =
+          new IFill[] {
+            new DoubleConstantFill(520.0),
+            new DoubleConstantFill(520.0),
+            new DoubleConstantFill(520.0)
+          };
+      FillOperator fillOperator =
+          new FillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  int delta = index * 10000;
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(
+                              TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE));
+                  // 1  1.0, null, 100.0
+                  builder.getTimeColumnBuilder().writeLong(1 + delta);
+                  builder.getColumnBuilder(0).writeDouble(1 + delta);
+                  builder.getColumnBuilder(1).appendNull();
+                  builder.getColumnBuilder(2).writeDouble(100 + delta);
+                  builder.declarePosition();
+                  // 2  2.0, 20.0, 200.0
+                  builder.getTimeColumnBuilder().writeLong(2 + delta);
+                  builder.getColumnBuilder(0).writeDouble(2 + delta);
+                  builder.getColumnBuilder(1).writeDouble(20 + delta);
+                  builder.getColumnBuilder(2).writeDouble(200 + delta);
+                  builder.declarePosition();
+                  // 3  3.0, 30.0, null
+                  builder.getTimeColumnBuilder().writeLong(3 + delta);
+                  builder.getColumnBuilder(0).writeDouble(3 + delta);
+                  builder.getColumnBuilder(1).writeDouble(30 + delta);
+                  builder.getColumnBuilder(2).appendNull();
+                  builder.declarePosition();
+                  // 4  null, 40.0, null
+                  builder.getTimeColumnBuilder().writeLong(4 + delta);
+                  builder.getColumnBuilder(0).appendNull();
+                  builder.getColumnBuilder(1).writeDouble(40 + delta);
+                  builder.getColumnBuilder(2).appendNull();
+                  builder.declarePosition();
+                  // 5  null, null, 500.0
+                  builder.getTimeColumnBuilder().writeLong(5 + delta);
+                  builder.getColumnBuilder(0).appendNull();
+                  builder.getColumnBuilder(1).appendNull();
+                  builder.getColumnBuilder(2).writeDouble(500 + delta);
+                  builder.declarePosition();
+
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      int count = 0;
+      double[][][] res =
+          new double[][][] {
+            {
+              {1.0, 520.0, 100.0},
+              {2, 20, 200},
+              {3, 30, 520.0},
+              {520.0, 40, 520.0},
+              {520.0, 520.0, 500}
+            },
+            {
+              {10001, 520.0, 10100},
+              {10002, 10020, 10200},
+              {10003, 10030, 520.0},
+              {520.0, 10040, 520.0},
+              {520.0, 520.0, 10500}
+            },
+            {
+              {20001, 520.0, 20100},
+              {20002, 20020, 20200},
+              {20003, 20030, 520.0},
+              {520.0, 20040, 520.0},
+              {520.0, 520.0, 20500}
+            }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            },
+            {
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            },
+            {
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            }
+          };
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + 1 + count * 10000L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 3; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getDouble(i), 0.00001);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(3, count);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void batchPreviousFillTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, FillOperator.class.getSimpleName());
+
+      IFill[] fillArray =
+          new IFill[] {new IntPreviousFill(), new IntPreviousFill(), new IntPreviousFill()};
+      FillOperator fillOperator =
+          new FillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  int delta = index * 10000;
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32));
+                  // 1  1, null, 100
+                  builder.getTimeColumnBuilder().writeLong(1 + delta);
+                  builder.getColumnBuilder(0).writeInt(1 + delta);
+                  builder.getColumnBuilder(1).appendNull();
+                  builder.getColumnBuilder(2).writeInt(100 + delta);
+                  builder.declarePosition();
+                  // 2  2, 20, 200
+                  builder.getTimeColumnBuilder().writeLong(2 + delta);
+                  builder.getColumnBuilder(0).writeInt(2 + delta);
+                  builder.getColumnBuilder(1).writeInt(20 + delta);
+                  builder.getColumnBuilder(2).writeInt(200 + delta);
+                  builder.declarePosition();
+                  // 3  3, 30, null
+                  builder.getTimeColumnBuilder().writeLong(3 + delta);
+                  builder.getColumnBuilder(0).writeInt(3 + delta);
+                  builder.getColumnBuilder(1).writeInt(30 + delta);
+                  builder.getColumnBuilder(2).appendNull();
+                  builder.declarePosition();
+                  // 4  null, 40, null
+                  builder.getTimeColumnBuilder().writeLong(4 + delta);
+                  builder.getColumnBuilder(0).appendNull();
+                  builder.getColumnBuilder(1).writeInt(40 + delta);
+                  builder.getColumnBuilder(2).appendNull();
+                  builder.declarePosition();
+                  // 5  null, null, 500
+                  builder.getTimeColumnBuilder().writeLong(5 + delta);
+                  builder.getColumnBuilder(0).appendNull();
+                  builder.getColumnBuilder(1).appendNull();
+                  builder.getColumnBuilder(2).writeInt(500 + delta);
+                  builder.declarePosition();
+
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      int count = 0;
+      int[][][] res =
+          new int[][][] {
+            {{1, 0, 100}, {2, 20, 200}, {3, 30, 200}, {3, 40, 200}, {3, 40, 500}},
+            {
+              {10001, 40, 10100},
+              {10002, 10020, 10200},
+              {10003, 10030, 10200},
+              {10003, 10040, 10200},
+              {10003, 10040, 10500}
+            },
+            {
+              {20001, 10040, 20100},
+              {20002, 20020, 20200},
+              {20003, 20030, 20200},
+              {20003, 20040, 20200},
+              {20003, 20040, 20500}
+            }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {
+              {false, true, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            },
+            {
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            },
+            {
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false},
+              {false, false, false}
+            }
+          };
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + 1 + count * 10000L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 3; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getInt(i));
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(3, count);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
new file mode 100644
index 0000000000..6c17c09d4f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LinearFillOperatorTest {
+
+  @Test
+  public void batchLinearFillTest1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      LinearFill[] fillArray =
+          new LinearFill[] {
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
+          };
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final float[][][] value =
+                    new float[][][] {
+                      {
+                        {1.0f, 0.0f, 3.0f, 4.0f},
+                        {11.0f, 12.0f, 13.0f, 0.0f},
+                        {21.0f, 22.0f, 0.0f, 0.0f},
+                        {0.0f, 32.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 43.0f, 0.0f}
+                      },
+                      {
+                        {51.0f, 0.0f, 53.0f, 0.0f},
+                        {61.0f, 62.0f, 63.0f, 0.0f},
+                        {71.0f, 72.0f, 0.0f, 74.0f},
+                        {0.0f, 82.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 93.0f, 0.0f}
+                      },
+                      {
+                        {101.0f, 0.0f, 103.0f, 0.0f},
+                        {111.0f, 112.0f, 113.0f, 114.0f},
+                        {121.0f, 122.0f, 0.0f, 124.0f},
+                        {0.0f, 132.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 143.0f, 0.0f}
+                      }
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                      {
+                        {false, true, false, false},
+                        {false, false, false, true},
+                        {false, false, true, true},
+                        {true, false, true, true},
+                        {true, true, false, true}
+                      },
+                      {
+                        {false, true, false, true},
+                        {false, false, false, true},
+                        {false, false, true, false},
+                        {true, false, true, true},
+                        {true, true, false, true}
+                      },
+                      {
+                        {false, true, false, true},
+                        {false, false, false, false},
+                        {false, false, true, false},
+                        {true, false, true, true},
+                        {true, true, false, true}
+                      }
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT));
+                  for (int i = 0; i < 5; i++) {
+                    builder.getTimeColumnBuilder().writeLong(i + index * 5L);
+                    for (int j = 0; j < 4; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      int count = 0;
+      float[][][] res =
+          new float[][][] {
+            {
+              {1.0f, 0.0f, 3.0f, 4.0f},
+              {11.0f, 12.0f, 13.0f, 39.0f},
+              {21.0f, 22.0f, 28.0f, 39.0f},
+              {36.0f, 32.0f, 28.0f, 39.0f},
+              {36.0f, 47.0f, 43.0f, 39.0f}
+            },
+            {
+              {51.0f, 47.0f, 53.0f, 39.0f},
+              {61.0f, 62.0f, 63.0f, 39.0f},
+              {71.0f, 72.0f, 78.0f, 74.0f},
+              {86.0f, 82.0f, 78.0f, 94.0f},
+              {86.0f, 97.0f, 93.0f, 94.0f}
+            },
+            {
+              {101.0f, 97.0f, 103.0f, 94.0f},
+              {111.0f, 112.0f, 113.0f, 114.0f},
+              {121.0f, 122.0f, 128.0f, 124.0f},
+              {0.0f, 132.0f, 128.0f, 0.0f},
+              {0.0f, 0.0f, 143.0f, 0.0f}
+            }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {
+              {false, true, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false}
+            },
+            {
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false}
+            },
+            {
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {true, false, false, true},
+              {true, true, false, true}
+            }
+          };
+
+      boolean[] nullBlock = new boolean[] {true, false, false, false};
+      int nullBlockIndex = 0;
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertEquals(nullBlock[nullBlockIndex++], block == null);
+        if (block == null) {
+          continue;
+        }
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + count * 5L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 4; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(3, count);
+      assertEquals(4, nullBlockIndex);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void batchLinearFillTest2() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      LinearFill[] fillArray =
+          new LinearFill[] {
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill(),
+            new FloatLinearFill()
+          };
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final float[][][] value =
+                    new float[][][] {
+                      {
+                        {1.0f, 0.0f, 3.0f, 4.0f},
+                        {11.0f, 12.0f, 13.0f, 0.0f},
+                        {21.0f, 22.0f, 0.0f, 0.0f},
+                        {0.0f, 32.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 0.0f, 0.0f}
+                      },
+                      {
+                        {51.0f, 0.0f, 0.0f, 0.0f},
+                        {61.0f, 62.0f, 0.0f, 0.0f},
+                        {71.0f, 72.0f, 0.0f, 74.0f},
+                        {0.0f, 82.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 0.0f, 0.0f}
+                      },
+                      {
+                        {101.0f, 0.0f, 103.0f, 0.0f},
+                        {111.0f, 112.0f, 0.0f, 114.0f},
+                        {121.0f, 122.0f, 0.0f, 124.0f},
+                        {0.0f, 132.0f, 0.0f, 0.0f},
+                        {0.0f, 0.0f, 0.0f, 0.0f}
+                      }
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                      {
+                        {false, true, false, false},
+                        {false, false, false, true},
+                        {false, false, true, true},
+                        {true, false, true, true},
+                        {true, true, true, true}
+                      },
+                      {
+                        {false, true, true, true},
+                        {false, false, true, true},
+                        {false, false, true, false},
+                        {true, false, true, true},
+                        {true, true, true, true}
+                      },
+                      {
+                        {false, true, false, true},
+                        {false, false, true, false},
+                        {false, false, true, false},
+                        {true, false, true, true},
+                        {true, true, true, true}
+                      }
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT));
+                  for (int i = 0; i < 5; i++) {
+                    builder.getTimeColumnBuilder().writeLong(i + index * 5L);
+                    for (int j = 0; j < 4; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      int count = 0;
+      float[][][] res =
+          new float[][][] {
+            {
+              {1.0f, 0.0f, 3.0f, 4.0f},
+              {11.0f, 12.0f, 13.0f, 39.0f},
+              {21.0f, 22.0f, 58.0f, 39.0f},
+              {36.0f, 32.0f, 58.0f, 39.0f},
+              {36.0f, 47.0f, 58.0f, 39.0f}
+            },
+            {
+              {51.0f, 47.0f, 58.0f, 39.0f},
+              {61.0f, 62.0f, 58.0f, 39.0f},
+              {71.0f, 72.0f, 58.0f, 74.0f},
+              {86.0f, 82.0f, 58.0f, 94.0f},
+              {86.0f, 97.0f, 58.0f, 94.0f}
+            },
+            {
+              {101.0f, 97.0f, 103.0f, 94.0f},
+              {111.0f, 112.0f, 0.0f, 114.0f},
+              {121.0f, 122.0f, 0.0f, 124.0f},
+              {0.0f, 132.0f, 0.0f, 0.0f},
+              {0.0f, 0.0f, 0.0f, 0.0f}
+            }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {
+              {false, true, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false}
+            },
+            {
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false},
+              {false, false, false, false}
+            },
+            {
+              {false, false, false, false},
+              {false, false, true, false},
+              {false, false, true, false},
+              {true, false, true, true},
+              {true, true, true, true}
+            }
+          };
+
+      boolean[] nullBlock = new boolean[] {true, true, false, false, false};
+      int nullBlockIndex = 0;
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertEquals(nullBlock[nullBlockIndex++], block == null);
+        if (block == null) {
+          continue;
+        }
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + count * 5L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 4; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(3, count);
+      assertEquals(5, nullBlockIndex);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}