You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/07 12:28:48 UTC

[iotdb] 03/03: Add LinearFill

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e391866e417770626a5c7e4646f98814c0f870ff
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat May 7 20:28:25 2022 +0800

    Add LinearFill
---
 .../execution/operator/process/FillOperator.java   |  49 +++++-
 .../execution/operator/process/LimitOperator.java  |   3 +
 .../operator/process/LinearFillOperator.java       | 157 ++++++++++++++++++
 .../execution/operator/process/OffsetOperator.java |   3 +
 .../mpp/execution/operator/process/fill/IFill.java |   3 +-
 .../process/fill/constant/BinaryConstantFill.java  |  23 ++-
 .../process/fill/constant/BooleanConstantFill.java |  23 ++-
 .../process/fill/constant/DoubleConstantFill.java  |  23 ++-
 .../process/fill/constant/FloatConstantFill.java   |  23 ++-
 .../process/fill/constant/IntConstantFill.java     |  23 ++-
 .../process/fill/constant/LongConstantFill.java    |  23 ++-
 .../process/fill/linear/DoubleLinearFill.java      |  94 +++++++++++
 .../process/fill/linear/FloatLinearFill.java       |  94 +++++++++++
 .../process/fill/linear/IntLinearFill.java         |  94 +++++++++++
 .../operator/process/fill/linear/LinearFill.java   | 179 +++++++++++++++++++++
 .../process/fill/linear/LongLinearFill.java        |  94 +++++++++++
 .../process/fill/previous/BinaryPreviousFill.java  |  31 ++--
 .../process/fill/previous/BooleanPreviousFill.java |  31 ++--
 .../process/fill/previous/DoublePreviousFill.java  |  31 ++--
 .../process/fill/previous/FloatPreviousFill.java   |  31 ++--
 .../process/fill/previous/IntPreviousFill.java     |  31 ++--
 .../process/fill/previous/LongPreviousFill.java    |  31 ++--
 22 files changed, 876 insertions(+), 218 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 02657f0b56..14281036ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -18,39 +18,76 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 public class FillOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final IFill[] fillArray;
+  private final Operator child;
+  private final int outputColumnCount;
+
+  public FillOperator(OperatorContext operatorContext, IFill[] fillArray, Operator child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+    checkArgument(
+        fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+    this.fillArray = fillArray;
+    this.child = requireNonNull(child, "child operator is null");
+    this.outputColumnCount = fillArray.length;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return child.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
+
+    checkArgument(
+        outputColumnCount == block.getValueColumnCount(),
+        "outputColumnCount is not equal to value column count of child operator's TsBlock");
+
+    Column[] valueColumns = new Column[outputColumnCount];
+
+    for (int i = 0; i < outputColumnCount; i++) {
+      valueColumns[i] = fillArray[i].fill(block.getColumn(i));
+    }
+
+    return TsBlock.wrapBlocksWithoutCopy(
+        block.getPositionCount(), block.getTimeColumn(), valueColumns);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return child.isFinished();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 9973e256ec..4ae909a2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -53,6 +53,9 @@ public class LimitOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
     TsBlock res = block;
     if (block.getPositionCount() <= remainingLimit) {
       remainingLimit -= block.getPositionCount();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
new file mode 100644
index 0000000000..e7b523afc1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class LinearFillOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final LinearFill[] fillArray;
+  private final Operator child;
+  private final int outputColumnCount;
+  private final LinkedList<TsBlock> cachedTsBlock;
+  private final Column[] cachedFilledValueColumns;
+
+  private int currentFilledColumnIndex;
+
+  public LinearFillOperator(
+      OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+    checkArgument(
+        fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+    this.fillArray = fillArray;
+    this.child = requireNonNull(child, "child operator is null");
+    this.outputColumnCount = fillArray.length;
+    this.cachedTsBlock = new LinkedList<>();
+    this.cachedFilledValueColumns = new Column[outputColumnCount];
+    this.currentFilledColumnIndex = 0;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+
+    // make sure we call child.next() at most once
+    boolean alreadyCallNext = false;
+    if (cachedTsBlock.isEmpty()) {
+      alreadyCallNext = true;
+      TsBlock nextBlock = child.next();
+      // child operator's calculation is not finished, so we just return null
+      if (nextBlock == null || nextBlock.isEmpty()) {
+        return nextBlock;
+      } else { // otherwise, we cache it
+        cachedTsBlock.addLast(nextBlock);
+      }
+    }
+
+    TsBlock block = cachedTsBlock.getFirst();
+    long currentEndTime = block.getEndTime();
+    // use cached TsBlock to keep filling remaining column
+    while (currentFilledColumnIndex < outputColumnCount) {
+      if (fillArray[currentFilledColumnIndex].needPrepareForNext(
+          currentEndTime, block.getColumn(currentFilledColumnIndex))) {
+        if (!alreadyCallNext) {
+          alreadyCallNext = true;
+          TsBlock nextBlock = child.next();
+          // child operator's calculation is not finished, so we just return null
+          if (nextBlock == null || nextBlock.isEmpty()) {
+            return nextBlock;
+          } else { // otherwise, we cache it
+            cachedTsBlock.addLast(nextBlock);
+          }
+          if (fillArray[currentFilledColumnIndex].prepareForNext(
+              currentEndTime,
+              cachedTsBlock.getLast().getTimeColumn(),
+              cachedTsBlock.getLast().getColumn(currentFilledColumnIndex))) {
+            cachedFilledValueColumns[currentFilledColumnIndex] =
+                fillArray[currentFilledColumnIndex].fill(
+                    block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+          } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+            // not finished, and we just return null
+            return null;
+          }
+        } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+          // not finished, and we just return null
+          return null;
+        }
+      } else {
+        cachedFilledValueColumns[currentFilledColumnIndex] =
+            fillArray[currentFilledColumnIndex].fill(
+                block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+      }
+      currentFilledColumnIndex++;
+    }
+
+    TsBlock originTsBlock = cachedTsBlock.removeFirst();
+    checkArgument(
+        outputColumnCount == originTsBlock.getValueColumnCount(),
+        "outputColumnCount is not equal to value column count of child operator's TsBlock");
+    TsBlock result =
+        TsBlock.wrapBlocksWithoutCopy(
+            originTsBlock.getPositionCount(),
+            originTsBlock.getTimeColumn(),
+            cachedFilledValueColumns);
+    Arrays.fill(cachedFilledValueColumns, null);
+    currentFilledColumnIndex = 0;
+    return result;
+  }
+
+  @Override
+  public boolean hasNext() {
+    boolean hasNext = child.hasNext();
+    if (!hasNext) {
+      for (LinearFill linearFill : fillArray) {
+        linearFill.setNoMoreData();
+      }
+    }
+    return !cachedTsBlock.isEmpty() || hasNext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return cachedTsBlock.isEmpty() && child.isFinished();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 04a7915e52..b2136a5b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -53,6 +53,9 @@ public class OffsetOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
     if (remainingOffset > 0) {
       int offset = Math.min((int) remainingOffset, block.getPositionCount());
       remainingOffset -= offset;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
index b7e92fe864..d43325a21c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -18,10 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill;
 
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
 public interface IFill {
 
-  Column fill(TsBlock tsBlock);
+  Column fill(Column valueColumn);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
index 9cf6be1ce5..f590adced3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -31,35 +30,31 @@ public class BinaryConstantFill implements IFill {
 
   // fill value
   private final Binary value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn, size of it must be 1
   private final Binary[] valueArray;
 
-  public BinaryConstantFill(Binary value, int columnIndex) {
+  public BinaryConstantFill(Binary value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new Binary[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
     } else {
       Binary[] array = new Binary[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getBinary(i);
+          array[i] = valueColumn.getBinary(i);
         }
       }
       return new BinaryColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
index 4d60c2cb60..7604248242 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class BooleanConstantFill implements IFill {
 
   // fill value
   private final boolean value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final boolean[] valueArray;
 
-  public BooleanConstantFill(boolean value, int columnIndex) {
+  public BooleanConstantFill(boolean value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new boolean[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
     } else {
       boolean[] array = new boolean[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getBoolean(i);
+          array[i] = valueColumn.getBoolean(i);
         }
       }
       return new BooleanColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
index d3a9244b9c..4615619312 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class DoubleConstantFill implements IFill {
 
   // fill value
   private final double value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final double[] valueArray;
 
-  public DoubleConstantFill(double value, int columnIndex) {
+  public DoubleConstantFill(double value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new double[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
     } else {
       double[] array = new double[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getDouble(i);
+          array[i] = valueColumn.getDouble(i);
         }
       }
       return new DoubleColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
index d58addee1f..f7d0dc0f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class FloatConstantFill implements IFill {
 
   // fill value
   private final float value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final float[] valueArray;
 
-  public FloatConstantFill(float value, int columnIndex) {
+  public FloatConstantFill(float value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new float[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
     } else {
       float[] array = new float[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getFloat(i);
+          array[i] = valueColumn.getFloat(i);
         }
       }
       return new FloatColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
index 85c21f2e4c..bbe34abf6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class IntConstantFill implements IFill {
 
   // fill value
   private final int value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final int[] valueArray;
 
-  public IntConstantFill(int value, int columnIndex) {
+  public IntConstantFill(int value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new int[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
     } else {
       int[] array = new int[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getInt(i);
+          array[i] = valueColumn.getInt(i);
         }
       }
       return new IntColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
index 1aae78a67f..72721980da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class LongConstantFill implements IFill {
 
   // fill value
   private final long value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final long[] valueArray;
 
-  public LongConstantFill(long value, int columnIndex) {
+  public LongConstantFill(long value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new long[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
     } else {
       long[] array = new long[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getLong(i);
+          array[i] = valueColumn.getLong(i);
         }
       }
       return new LongColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
new file mode 100644
index 0000000000..6dd68192c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+
+import java.util.Optional;
+
+public class DoubleLinearFill extends LinearFill {
+
+  // previous value
+  private double previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private double nextValue;
+
+  private double nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((double[]) array)[index] = column.getDouble(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((double[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new double[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return DoubleColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    double filledValue = getFilledValue();
+    return new DoubleColumn(1, Optional.empty(), new double[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new DoubleColumn(size, Optional.empty(), (double[]) array);
+    } else {
+      return new DoubleColumn(size, Optional.of(isNull), (double[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getDouble(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getDouble(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getDouble(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private double getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2.0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
new file mode 100644
index 0000000000..2cc80f2fd0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+
+import java.util.Optional;
+
+public class FloatLinearFill extends LinearFill {
+
+  // previous value
+  private float previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private float nextValue;
+
+  private float nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((float[]) array)[index] = column.getFloat(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((float[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new float[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return FloatColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    float filledValue = getFilledValue();
+    return new FloatColumn(1, Optional.empty(), new float[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new FloatColumn(size, Optional.empty(), (float[]) array);
+    } else {
+      return new FloatColumn(size, Optional.of(isNull), (float[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getFloat(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getFloat(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getFloat(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private float getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2.0f;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
new file mode 100644
index 0000000000..97ea4d05cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+
+import java.util.Optional;
+
+public class IntLinearFill extends LinearFill {
+
+  // previous value
+  private int previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private int nextValue;
+
+  private int nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((int[]) array)[index] = column.getInt(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((int[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new int[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return IntColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    int filledValue = getFilledValue();
+    return new IntColumn(1, Optional.empty(), new int[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new IntColumn(size, Optional.empty(), (int[]) array);
+    } else {
+      return new IntColumn(size, Optional.of(isNull), (int[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getInt(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getInt(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getInt(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private int getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
new file mode 100644
index 0000000000..4a137c9eba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public abstract class LinearFill {
+
+  // whether previous value is null
+  protected boolean previousIsNull = true;
+  // time of next value
+  protected long nextTime = Long.MIN_VALUE;
+
+  protected long nextTimeInCurrentColumn;
+  // whether next value is null
+  protected boolean noMoreNext = false;
+
+  /**
+   * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
+   * has been set to true
+   *
+   * @param timeColumn TimeColumn of valueColumn
+   * @param valueColumn valueColumn that need to be filled
+   * @return Value Column that has been filled
+   */
+  public Column fill(TimeColumn timeColumn, Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        updatePreviousValue(valueColumn, valueColumn.getPositionCount() - 1);
+      }
+      return valueColumn;
+    }
+
+    // if its values are all null
+    if (valueColumn instanceof RunLengthEncodedColumn) {
+      // previous value is null or next value is null, we just return NULL_VALUE_BLOCK
+      if (previousIsNull || nextTime < timeColumn.getStartTime()) {
+        return new RunLengthEncodedColumn(createNullValueColumn(), size);
+      } else {
+        prepareForNextValueInCurrentColumn(
+            timeColumn.getEndTime(), timeColumn.getPositionCount() - 1, timeColumn, valueColumn);
+        return new RunLengthEncodedColumn(createFilledValueColumn(), size);
+      }
+    } else {
+      Object array = createValueArray(size);
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+
+      for (int i = 0; i < size; i++) {
+        // current value is null, we need to fill it
+        if (valueColumn.isNull(i)) {
+          long currentTime = timeColumn.getLong(i);
+          prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn, valueColumn);
+          // we don't fill it, if either previous value or next value is null
+          if (previousIsNull || nextIsNull(currentTime)) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            // fill value using previous and next value
+            fillValue(array, i);
+          }
+        } else { // current is not null
+          // fill value using its own value
+          fillValue(valueColumn, i, array);
+          // update previous value
+          updatePreviousValue(valueColumn, i);
+          previousIsNull = false;
+        }
+      }
+      return createFilledValueColumn(array, isNull, nonNullValue, size);
+    }
+  }
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param valueColumn valueColumn that need to be filled
+   * @return true if valueColumn can't be filled using current information, and we need to get next
+   *     TsBlock and then call prepareForNext false if valueColumn can be filled using current
+   *     information, and we can directly call fill() function
+   */
+  public boolean needPrepareForNext(long time, Column valueColumn) {
+    return !noMoreNext && time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+  }
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param nextTimeColumn TimeColumn of next TsBlock
+   * @param nextValueColumn Value Column of next TsBlock
+   * @return true if we get enough information to fill current column, and we can stop getting next
+   *     TsBlock and calling prepareForNext false if we still don't get enough information to fill
+   *     current column, and still need to keep getting next TsBlock and then call prepareForNext
+   */
+  public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+    if (time <= nextTime) {
+      return true;
+    }
+
+    for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
+      if (!nextValueColumn.isNull(i)) {
+        updateNextValue(nextValueColumn, i);
+        this.nextTime = nextTimeColumn.getLong(i);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** no more next column */
+  public void setNoMoreData() {
+    this.noMoreNext = true;
+  }
+
+  private boolean nextIsNull(long time) {
+    return nextTimeInCurrentColumn <= time;
+  }
+
+  private void prepareForNextValueInCurrentColumn(
+      long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
+    if (time <= nextTimeInCurrentColumn) {
+      return;
+    }
+    for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+        updateNextValueInCurrentColumn(valueColumn, i);
+        return;
+      }
+    }
+
+    // current column's value is not enough for filling, we should use value of next Column
+    this.nextTimeInCurrentColumn = this.nextTime;
+    updateNextValueInCurrentColumn();
+  }
+
+  abstract void fillValue(Column column, int index, Object array);
+
+  abstract void fillValue(Object array, int index);
+
+  abstract Object createValueArray(int size);
+
+  abstract Column createNullValueColumn();
+
+  abstract Column createFilledValueColumn();
+
+  abstract Column createFilledValueColumn(
+      Object array, boolean[] isNull, boolean nonNullValue, int size);
+
+  abstract void updatePreviousValue(Column column, int index);
+
+  abstract void updateNextValue(Column nextValueColumn, int index);
+
+  abstract void updateNextValueInCurrentColumn(Column nextValueColumn, int index);
+
+  /** update nextValueInCurrentColumn using value of next Column */
+  abstract void updateNextValueInCurrentColumn();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
new file mode 100644
index 0000000000..dd0e3e75fb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+
+import java.util.Optional;
+
+public class LongLinearFill extends LinearFill {
+
+  // previous value
+  private long previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private long nextValue;
+
+  private long nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((long[]) array)[index] = column.getLong(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((long[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new long[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return LongColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    long filledValue = getFilledValue();
+    return new LongColumn(1, Optional.empty(), new long[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new LongColumn(size, Optional.empty(), (long[]) array);
+    } else {
+      return new LongColumn(size, Optional.of(isNull), (long[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getLong(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getLong(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getLong(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private long getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2L;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
index 897cd21d7a..e6cbf22b4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -33,35 +32,25 @@ public class BinaryPreviousFill implements IFill {
   // previous value
   private Binary value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public BinaryPreviousFill(Binary value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getBinary(size - 1);
+        value = valueColumn.getBinary(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getBinary(size - 1);
         return new RunLengthEncodedColumn(
             new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
       }
@@ -71,7 +60,7 @@ public class BinaryPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -79,7 +68,7 @@ public class BinaryPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getBinary(i);
+          array[i] = valueColumn.getBinary(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
index a1f3c5a9e7..a7a1da249c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,35 +31,25 @@ public class BooleanPreviousFill implements IFill {
   // previous value
   private boolean value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public BooleanPreviousFill(boolean value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getBoolean(size - 1);
+        value = valueColumn.getBoolean(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getBoolean(size - 1);
         return new RunLengthEncodedColumn(
             new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class BooleanPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class BooleanPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getBoolean(i);
+          array[i] = valueColumn.getBoolean(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
index e113ffffd5..fd0e1892ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -32,35 +31,25 @@ public class DoublePreviousFill implements IFill {
   // previous value
   private double value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public DoublePreviousFill(double value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getDouble(size - 1);
+        value = valueColumn.getDouble(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getDouble(size - 1);
         return new RunLengthEncodedColumn(
             new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class DoublePreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class DoublePreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getDouble(i);
+          array[i] = valueColumn.getDouble(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
index d482430a6d..9c8f221f72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -32,35 +31,25 @@ public class FloatPreviousFill implements IFill {
   // previous value
   private float value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public FloatPreviousFill(float value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getFloat(size - 1);
+        value = valueColumn.getFloat(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getFloat(size - 1);
         return new RunLengthEncodedColumn(
             new FloatColumn(1, Optional.empty(), new float[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class FloatPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class FloatPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getFloat(i);
+          array[i] = valueColumn.getFloat(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
index 6e95f028b1..a444dedc05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -32,35 +31,25 @@ public class IntPreviousFill implements IFill {
   // previous value
   private int value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public IntPreviousFill(int value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getInt(size - 1);
+        value = valueColumn.getInt(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getInt(size - 1);
         return new RunLengthEncodedColumn(
             new IntColumn(1, Optional.empty(), new int[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class IntPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class IntPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getInt(i);
+          array[i] = valueColumn.getInt(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
index 116f616aab..c145cc007f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -32,35 +31,25 @@ public class LongPreviousFill implements IFill {
   // previous value
   private long value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public LongPreviousFill(long value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getLong(size - 1);
+        value = valueColumn.getLong(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getLong(size - 1);
         return new RunLengthEncodedColumn(
             new LongColumn(1, Optional.empty(), new long[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class LongPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class LongPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getLong(i);
+          array[i] = valueColumn.getLong(i);
           value = array[i];
           previousIsNull = false;
         }