You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/07 12:28:45 UTC

[iotdb] branch FillOperator created (now e391866e41)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at e391866e41 Add LinearFill

This branch includes the following new commits:

     new 56fdd47cd8 [IOTDB-3038] Implementation of fill operator
     new d2fe7abbea spotless
     new e391866e41 Add LinearFill

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 03/03: Add LinearFill

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e391866e417770626a5c7e4646f98814c0f870ff
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat May 7 20:28:25 2022 +0800

    Add LinearFill
---
 .../execution/operator/process/FillOperator.java   |  49 +++++-
 .../execution/operator/process/LimitOperator.java  |   3 +
 .../operator/process/LinearFillOperator.java       | 157 ++++++++++++++++++
 .../execution/operator/process/OffsetOperator.java |   3 +
 .../mpp/execution/operator/process/fill/IFill.java |   3 +-
 .../process/fill/constant/BinaryConstantFill.java  |  23 ++-
 .../process/fill/constant/BooleanConstantFill.java |  23 ++-
 .../process/fill/constant/DoubleConstantFill.java  |  23 ++-
 .../process/fill/constant/FloatConstantFill.java   |  23 ++-
 .../process/fill/constant/IntConstantFill.java     |  23 ++-
 .../process/fill/constant/LongConstantFill.java    |  23 ++-
 .../process/fill/linear/DoubleLinearFill.java      |  94 +++++++++++
 .../process/fill/linear/FloatLinearFill.java       |  94 +++++++++++
 .../process/fill/linear/IntLinearFill.java         |  94 +++++++++++
 .../operator/process/fill/linear/LinearFill.java   | 179 +++++++++++++++++++++
 .../process/fill/linear/LongLinearFill.java        |  94 +++++++++++
 .../process/fill/previous/BinaryPreviousFill.java  |  31 ++--
 .../process/fill/previous/BooleanPreviousFill.java |  31 ++--
 .../process/fill/previous/DoublePreviousFill.java  |  31 ++--
 .../process/fill/previous/FloatPreviousFill.java   |  31 ++--
 .../process/fill/previous/IntPreviousFill.java     |  31 ++--
 .../process/fill/previous/LongPreviousFill.java    |  31 ++--
 22 files changed, 876 insertions(+), 218 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 02657f0b56..14281036ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -18,39 +18,76 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 public class FillOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final IFill[] fillArray;
+  private final Operator child;
+  private final int outputColumnCount;
+
+  public FillOperator(OperatorContext operatorContext, IFill[] fillArray, Operator child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+    checkArgument(
+        fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+    this.fillArray = fillArray;
+    this.child = requireNonNull(child, "child operator is null");
+    this.outputColumnCount = fillArray.length;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    return child.isBlocked();
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
+
+    checkArgument(
+        outputColumnCount == block.getValueColumnCount(),
+        "outputColumnCount is not equal to value column count of child operator's TsBlock");
+
+    Column[] valueColumns = new Column[outputColumnCount];
+
+    for (int i = 0; i < outputColumnCount; i++) {
+      valueColumns[i] = fillArray[i].fill(block.getColumn(i));
+    }
+
+    return TsBlock.wrapBlocksWithoutCopy(
+        block.getPositionCount(), block.getTimeColumn(), valueColumns);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    return child.hasNext();
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return child.isFinished();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 9973e256ec..4ae909a2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -53,6 +53,9 @@ public class LimitOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
     TsBlock res = block;
     if (block.getPositionCount() <= remainingLimit) {
       remainingLimit -= block.getPositionCount();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
new file mode 100644
index 0000000000..e7b523afc1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class LinearFillOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final LinearFill[] fillArray;
+  private final Operator child;
+  private final int outputColumnCount;
+  private final LinkedList<TsBlock> cachedTsBlock;
+  private final Column[] cachedFilledValueColumns;
+
+  private int currentFilledColumnIndex;
+
+  public LinearFillOperator(
+      OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
+    this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+    checkArgument(
+        fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+    this.fillArray = fillArray;
+    this.child = requireNonNull(child, "child operator is null");
+    this.outputColumnCount = fillArray.length;
+    this.cachedTsBlock = new LinkedList<>();
+    this.cachedFilledValueColumns = new Column[outputColumnCount];
+    this.currentFilledColumnIndex = 0;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+
+    // make sure we call child.next() at most once
+    boolean alreadyCallNext = false;
+    if (cachedTsBlock.isEmpty()) {
+      alreadyCallNext = true;
+      TsBlock nextBlock = child.next();
+      // child operator's calculation is not finished, so we just return null
+      if (nextBlock == null || nextBlock.isEmpty()) {
+        return nextBlock;
+      } else { // otherwise, we cache it
+        cachedTsBlock.addLast(nextBlock);
+      }
+    }
+
+    TsBlock block = cachedTsBlock.getFirst();
+    long currentEndTime = block.getEndTime();
+    // use cached TsBlock to keep filling remaining column
+    while (currentFilledColumnIndex < outputColumnCount) {
+      if (fillArray[currentFilledColumnIndex].needPrepareForNext(
+          currentEndTime, block.getColumn(currentFilledColumnIndex))) {
+        if (!alreadyCallNext) {
+          alreadyCallNext = true;
+          TsBlock nextBlock = child.next();
+          // child operator's calculation is not finished, so we just return null
+          if (nextBlock == null || nextBlock.isEmpty()) {
+            return nextBlock;
+          } else { // otherwise, we cache it
+            cachedTsBlock.addLast(nextBlock);
+          }
+          if (fillArray[currentFilledColumnIndex].prepareForNext(
+              currentEndTime,
+              cachedTsBlock.getLast().getTimeColumn(),
+              cachedTsBlock.getLast().getColumn(currentFilledColumnIndex))) {
+            cachedFilledValueColumns[currentFilledColumnIndex] =
+                fillArray[currentFilledColumnIndex].fill(
+                    block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+          } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+            // not finished, and we just return null
+            return null;
+          }
+        } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+          // not finished, and we just return null
+          return null;
+        }
+      } else {
+        cachedFilledValueColumns[currentFilledColumnIndex] =
+            fillArray[currentFilledColumnIndex].fill(
+                block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+      }
+      currentFilledColumnIndex++;
+    }
+
+    TsBlock originTsBlock = cachedTsBlock.removeFirst();
+    checkArgument(
+        outputColumnCount == originTsBlock.getValueColumnCount(),
+        "outputColumnCount is not equal to value column count of child operator's TsBlock");
+    TsBlock result =
+        TsBlock.wrapBlocksWithoutCopy(
+            originTsBlock.getPositionCount(),
+            originTsBlock.getTimeColumn(),
+            cachedFilledValueColumns);
+    Arrays.fill(cachedFilledValueColumns, null);
+    currentFilledColumnIndex = 0;
+    return result;
+  }
+
+  @Override
+  public boolean hasNext() {
+    boolean hasNext = child.hasNext();
+    if (!hasNext) {
+      for (LinearFill linearFill : fillArray) {
+        linearFill.setNoMoreData();
+      }
+    }
+    return !cachedTsBlock.isEmpty() || hasNext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return cachedTsBlock.isEmpty() && child.isFinished();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 04a7915e52..b2136a5b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -53,6 +53,9 @@ public class OffsetOperator implements ProcessOperator {
   @Override
   public TsBlock next() {
     TsBlock block = child.next();
+    if (block == null) {
+      return null;
+    }
     if (remainingOffset > 0) {
       int offset = Math.min((int) remainingOffset, block.getPositionCount());
       remainingOffset -= offset;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
index b7e92fe864..d43325a21c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -18,10 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill;
 
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
 public interface IFill {
 
-  Column fill(TsBlock tsBlock);
+  Column fill(Column valueColumn);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
index 9cf6be1ce5..f590adced3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -31,35 +30,31 @@ public class BinaryConstantFill implements IFill {
 
   // fill value
   private final Binary value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn, size of it must be 1
   private final Binary[] valueArray;
 
-  public BinaryConstantFill(Binary value, int columnIndex) {
+  public BinaryConstantFill(Binary value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new Binary[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
     } else {
       Binary[] array = new Binary[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getBinary(i);
+          array[i] = valueColumn.getBinary(i);
         }
       }
       return new BinaryColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
index 4d60c2cb60..7604248242 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class BooleanConstantFill implements IFill {
 
   // fill value
   private final boolean value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final boolean[] valueArray;
 
-  public BooleanConstantFill(boolean value, int columnIndex) {
+  public BooleanConstantFill(boolean value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new boolean[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
     } else {
       boolean[] array = new boolean[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getBoolean(i);
+          array[i] = valueColumn.getBoolean(i);
         }
       }
       return new BooleanColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
index d3a9244b9c..4615619312 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class DoubleConstantFill implements IFill {
 
   // fill value
   private final double value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final double[] valueArray;
 
-  public DoubleConstantFill(double value, int columnIndex) {
+  public DoubleConstantFill(double value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new double[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
     } else {
       double[] array = new double[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getDouble(i);
+          array[i] = valueColumn.getDouble(i);
         }
       }
       return new DoubleColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
index d58addee1f..f7d0dc0f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class FloatConstantFill implements IFill {
 
   // fill value
   private final float value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final float[] valueArray;
 
-  public FloatConstantFill(float value, int columnIndex) {
+  public FloatConstantFill(float value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new float[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
     } else {
       float[] array = new float[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getFloat(i);
+          array[i] = valueColumn.getFloat(i);
         }
       }
       return new FloatColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
index 85c21f2e4c..bbe34abf6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class IntConstantFill implements IFill {
 
   // fill value
   private final int value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final int[] valueArray;
 
-  public IntConstantFill(int value, int columnIndex) {
+  public IntConstantFill(int value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new int[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
     } else {
       int[] array = new int[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getInt(i);
+          array[i] = valueColumn.getInt(i);
         }
       }
       return new IntColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
index 1aae78a67f..72721980da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class LongConstantFill implements IFill {
 
   // fill value
   private final long value;
-  // index of the column which is need to be filled
-  private final int columnIndex;
   // used for constructing RunLengthEncodedColumn
   private final long[] valueArray;
 
-  public LongConstantFill(long value, int columnIndex) {
+  public LongConstantFill(long value) {
     this.value = value;
-    this.columnIndex = columnIndex;
     this.valueArray = new long[] {value};
   }
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
-      return column;
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
     } else {
       long[] array = new long[size];
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           array[i] = value;
         } else {
-          array[i] = column.getLong(i);
+          array[i] = valueColumn.getLong(i);
         }
       }
       return new LongColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
new file mode 100644
index 0000000000..6dd68192c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+
+import java.util.Optional;
+
+public class DoubleLinearFill extends LinearFill {
+
+  // previous value
+  private double previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private double nextValue;
+
+  private double nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((double[]) array)[index] = column.getDouble(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((double[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new double[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return DoubleColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    double filledValue = getFilledValue();
+    return new DoubleColumn(1, Optional.empty(), new double[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new DoubleColumn(size, Optional.empty(), (double[]) array);
+    } else {
+      return new DoubleColumn(size, Optional.of(isNull), (double[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getDouble(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getDouble(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getDouble(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private double getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2.0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
new file mode 100644
index 0000000000..2cc80f2fd0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+
+import java.util.Optional;
+
+public class FloatLinearFill extends LinearFill {
+
+  // previous value
+  private float previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private float nextValue;
+
+  private float nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((float[]) array)[index] = column.getFloat(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((float[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new float[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return FloatColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    float filledValue = getFilledValue();
+    return new FloatColumn(1, Optional.empty(), new float[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new FloatColumn(size, Optional.empty(), (float[]) array);
+    } else {
+      return new FloatColumn(size, Optional.of(isNull), (float[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getFloat(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getFloat(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getFloat(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private float getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2.0f;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
new file mode 100644
index 0000000000..97ea4d05cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+
+import java.util.Optional;
+
+public class IntLinearFill extends LinearFill {
+
+  // previous value
+  private int previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private int nextValue;
+
+  private int nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((int[]) array)[index] = column.getInt(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((int[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new int[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return IntColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    int filledValue = getFilledValue();
+    return new IntColumn(1, Optional.empty(), new int[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new IntColumn(size, Optional.empty(), (int[]) array);
+    } else {
+      return new IntColumn(size, Optional.of(isNull), (int[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getInt(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getInt(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getInt(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private int getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
new file mode 100644
index 0000000000..4a137c9eba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public abstract class LinearFill {
+
+  // whether previous value is null
+  protected boolean previousIsNull = true;
+  // time of next value
+  protected long nextTime = Long.MIN_VALUE;
+
+  protected long nextTimeInCurrentColumn;
+  // whether next value is null
+  protected boolean noMoreNext = false;
+
+  /**
+   * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
+   * has been set to true
+   *
+   * @param timeColumn TimeColumn of valueColumn
+   * @param valueColumn valueColumn that need to be filled
+   * @return Value Column that has been filled
+   */
+  public Column fill(TimeColumn timeColumn, Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        updatePreviousValue(valueColumn, valueColumn.getPositionCount() - 1);
+      }
+      return valueColumn;
+    }
+
+    // if its values are all null
+    if (valueColumn instanceof RunLengthEncodedColumn) {
+      // previous value is null or next value is null, we just return NULL_VALUE_BLOCK
+      if (previousIsNull || nextTime < timeColumn.getStartTime()) {
+        return new RunLengthEncodedColumn(createNullValueColumn(), size);
+      } else {
+        prepareForNextValueInCurrentColumn(
+            timeColumn.getEndTime(), timeColumn.getPositionCount() - 1, timeColumn, valueColumn);
+        return new RunLengthEncodedColumn(createFilledValueColumn(), size);
+      }
+    } else {
+      Object array = createValueArray(size);
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+
+      for (int i = 0; i < size; i++) {
+        // current value is null, we need to fill it
+        if (valueColumn.isNull(i)) {
+          long currentTime = timeColumn.getLong(i);
+          prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn, valueColumn);
+          // we don't fill it, if either previous value or next value is null
+          if (previousIsNull || nextIsNull(currentTime)) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            // fill value using previous and next value
+            fillValue(array, i);
+          }
+        } else { // current is not null
+          // fill value using its own value
+          fillValue(valueColumn, i, array);
+          // update previous value
+          updatePreviousValue(valueColumn, i);
+          previousIsNull = false;
+        }
+      }
+      return createFilledValueColumn(array, isNull, nonNullValue, size);
+    }
+  }
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param valueColumn valueColumn that need to be filled
+   * @return true if valueColumn can't be filled using current information, and we need to get next
+   *     TsBlock and then call prepareForNext false if valueColumn can be filled using current
+   *     information, and we can directly call fill() function
+   */
+  public boolean needPrepareForNext(long time, Column valueColumn) {
+    return !noMoreNext && time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+  }
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param nextTimeColumn TimeColumn of next TsBlock
+   * @param nextValueColumn Value Column of next TsBlock
+   * @return true if we get enough information to fill current column, and we can stop getting next
+   *     TsBlock and calling prepareForNext false if we still don't get enough information to fill
+   *     current column, and still need to keep getting next TsBlock and then call prepareForNext
+   */
+  public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+    if (time <= nextTime) {
+      return true;
+    }
+
+    for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
+      if (!nextValueColumn.isNull(i)) {
+        updateNextValue(nextValueColumn, i);
+        this.nextTime = nextTimeColumn.getLong(i);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** no more next column */
+  public void setNoMoreData() {
+    this.noMoreNext = true;
+  }
+
+  private boolean nextIsNull(long time) {
+    return nextTimeInCurrentColumn <= time;
+  }
+
+  private void prepareForNextValueInCurrentColumn(
+      long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
+    if (time <= nextTimeInCurrentColumn) {
+      return;
+    }
+    for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+        updateNextValueInCurrentColumn(valueColumn, i);
+        return;
+      }
+    }
+
+    // current column's value is not enough for filling, we should use value of next Column
+    this.nextTimeInCurrentColumn = this.nextTime;
+    updateNextValueInCurrentColumn();
+  }
+
+  abstract void fillValue(Column column, int index, Object array);
+
+  abstract void fillValue(Object array, int index);
+
+  abstract Object createValueArray(int size);
+
+  abstract Column createNullValueColumn();
+
+  abstract Column createFilledValueColumn();
+
+  abstract Column createFilledValueColumn(
+      Object array, boolean[] isNull, boolean nonNullValue, int size);
+
+  abstract void updatePreviousValue(Column column, int index);
+
+  abstract void updateNextValue(Column nextValueColumn, int index);
+
+  abstract void updateNextValueInCurrentColumn(Column nextValueColumn, int index);
+
+  /** update nextValueInCurrentColumn using value of next Column */
+  abstract void updateNextValueInCurrentColumn();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
new file mode 100644
index 0000000000..dd0e3e75fb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+
+import java.util.Optional;
+
+public class LongLinearFill extends LinearFill {
+
+  // previous value
+  private long previousValue;
+  // next non-null value whose time is closest to the current TsBlock's endTime
+  private long nextValue;
+
+  private long nextValueInCurrentColumn;
+
+  @Override
+  void fillValue(Column column, int index, Object array) {
+    ((long[]) array)[index] = column.getLong(index);
+  }
+
+  @Override
+  void fillValue(Object array, int index) {
+    ((long[]) array)[index] = getFilledValue();
+  }
+
+  @Override
+  Object createValueArray(int size) {
+    return new long[size];
+  }
+
+  @Override
+  Column createNullValueColumn() {
+    return LongColumnBuilder.NULL_VALUE_BLOCK;
+  }
+
+  @Override
+  Column createFilledValueColumn() {
+    long filledValue = getFilledValue();
+    return new LongColumn(1, Optional.empty(), new long[] {filledValue});
+  }
+
+  @Override
+  Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+    if (nonNullValue) {
+      return new LongColumn(size, Optional.empty(), (long[]) array);
+    } else {
+      return new LongColumn(size, Optional.of(isNull), (long[]) array);
+    }
+  }
+
+  @Override
+  void updatePreviousValue(Column column, int index) {
+    previousValue = column.getLong(index);
+  }
+
+  @Override
+  void updateNextValue(Column nextValueColumn, int index) {
+    this.nextValue = nextValueColumn.getLong(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+    this.nextValueInCurrentColumn = nextValueColumn.getLong(index);
+  }
+
+  @Override
+  void updateNextValueInCurrentColumn() {
+    this.nextValueInCurrentColumn = this.nextValue;
+  }
+
+  private long getFilledValue() {
+    return (previousValue + nextValueInCurrentColumn) / 2L;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
index 897cd21d7a..e6cbf22b4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -33,35 +32,25 @@ public class BinaryPreviousFill implements IFill {
   // previous value
   private Binary value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public BinaryPreviousFill(Binary value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getBinary(size - 1);
+        value = valueColumn.getBinary(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getBinary(size - 1);
         return new RunLengthEncodedColumn(
             new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
       }
@@ -71,7 +60,7 @@ public class BinaryPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -79,7 +68,7 @@ public class BinaryPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getBinary(i);
+          array[i] = valueColumn.getBinary(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
index a1f3c5a9e7..a7a1da249c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,35 +31,25 @@ public class BooleanPreviousFill implements IFill {
   // previous value
   private boolean value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public BooleanPreviousFill(boolean value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getBoolean(size - 1);
+        value = valueColumn.getBoolean(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getBoolean(size - 1);
         return new RunLengthEncodedColumn(
             new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class BooleanPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class BooleanPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getBoolean(i);
+          array[i] = valueColumn.getBoolean(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
index e113ffffd5..fd0e1892ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -32,35 +31,25 @@ public class DoublePreviousFill implements IFill {
   // previous value
   private double value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public DoublePreviousFill(double value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getDouble(size - 1);
+        value = valueColumn.getDouble(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getDouble(size - 1);
         return new RunLengthEncodedColumn(
             new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class DoublePreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class DoublePreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getDouble(i);
+          array[i] = valueColumn.getDouble(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
index d482430a6d..9c8f221f72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -32,35 +31,25 @@ public class FloatPreviousFill implements IFill {
   // previous value
   private float value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public FloatPreviousFill(float value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getFloat(size - 1);
+        value = valueColumn.getFloat(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getFloat(size - 1);
         return new RunLengthEncodedColumn(
             new FloatColumn(1, Optional.empty(), new float[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class FloatPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class FloatPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getFloat(i);
+          array[i] = valueColumn.getFloat(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
index 6e95f028b1..a444dedc05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -32,35 +31,25 @@ public class IntPreviousFill implements IFill {
   // previous value
   private int value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public IntPreviousFill(int value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getInt(size - 1);
+        value = valueColumn.getInt(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getInt(size - 1);
         return new RunLengthEncodedColumn(
             new IntColumn(1, Optional.empty(), new int[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class IntPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class IntPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getInt(i);
+          array[i] = valueColumn.getInt(i);
           value = array[i];
           previousIsNull = false;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
index 116f616aab..c145cc007f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
 
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -32,35 +31,25 @@ public class LongPreviousFill implements IFill {
   // previous value
   private long value;
   // whether previous value is null
-  private boolean previousIsNull;
-  // index of the column which is need to be filled
-  private final int columnIndex;
-
-  public LongPreviousFill(long value, int columnIndex) {
-    this.value = value;
-    this.columnIndex = columnIndex;
-  }
+  private boolean previousIsNull = true;
 
   @Override
-  public Column fill(TsBlock tsBlock) {
-    Column column = tsBlock.getColumn(columnIndex);
-    int size = column.getPositionCount();
-    // if this column doesn't have any null value, or it's empty, just return itself;
-    if (!column.mayHaveNull() || size == 0) {
+  public Column fill(Column valueColumn) {
+    int size = valueColumn.getPositionCount();
+    // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+    if (!valueColumn.mayHaveNull() || size == 0) {
       if (size != 0) {
         previousIsNull = false;
         // update the value using last non-null value
-        value = column.getLong(size - 1);
+        value = valueColumn.getLong(size - 1);
       }
-      return column;
+      return valueColumn;
     }
     // if its values are all null
-    if (column instanceof RunLengthEncodedColumn) {
+    if (valueColumn instanceof RunLengthEncodedColumn) {
       if (previousIsNull) {
         return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
       } else {
-        // update the value using last non-null value
-        value = column.getLong(size - 1);
         return new RunLengthEncodedColumn(
             new LongColumn(1, Optional.empty(), new long[] {value}), size);
       }
@@ -70,7 +59,7 @@ public class LongPreviousFill implements IFill {
       // have no null value
       boolean nonNullValue = true;
       for (int i = 0; i < size; i++) {
-        if (column.isNull(i)) {
+        if (valueColumn.isNull(i)) {
           if (previousIsNull) {
             isNull[i] = true;
             nonNullValue = false;
@@ -78,7 +67,7 @@ public class LongPreviousFill implements IFill {
             array[i] = value;
           }
         } else {
-          array[i] = column.getLong(i);
+          array[i] = valueColumn.getLong(i);
           value = array[i];
           previousIsNull = false;
         }


[iotdb] 01/03: [IOTDB-3038] Implementation of fill operator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56fdd47cd840bf1b66b920477c297c05bd198e26
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 6 16:26:33 2022 +0800

    [IOTDB-3038] Implementation of fill operator
---
 .../mpp/execution/operator/process/fill/IFill.java | 27 +++++++
 .../process/fill/constant/BinaryConstantFill.java  | 68 ++++++++++++++++
 .../process/fill/constant/BooleanConstantFill.java | 67 ++++++++++++++++
 .../process/fill/constant/DoubleConstantFill.java  | 67 ++++++++++++++++
 .../process/fill/constant/FloatConstantFill.java   | 67 ++++++++++++++++
 .../process/fill/constant/IntConstantFill.java     | 67 ++++++++++++++++
 .../process/fill/constant/LongConstantFill.java    | 67 ++++++++++++++++
 .../process/fill/previous/BinaryPreviousFill.java  | 93 ++++++++++++++++++++++
 .../process/fill/previous/BooleanPreviousFill.java | 92 +++++++++++++++++++++
 .../process/fill/previous/DoublePreviousFill.java  | 92 +++++++++++++++++++++
 .../process/fill/previous/FloatPreviousFill.java   | 92 +++++++++++++++++++++
 .../process/fill/previous/IntPreviousFill.java     | 92 +++++++++++++++++++++
 .../process/fill/previous/LongPreviousFill.java    | 92 +++++++++++++++++++++
 13 files changed, 983 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
new file mode 100644
index 0000000000..b7e92fe864
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IFill {
+
+  Column fill(TsBlock tsBlock);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
new file mode 100644
index 0000000000..9cf6be1ce5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryConstantFill implements IFill {
+
+  // fill value
+  private final Binary value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn, size of it must be 1
+  private final Binary[] valueArray;
+
+  public BinaryConstantFill(Binary value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new Binary[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      Binary[] array = new Binary[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getBinary(i);
+        }
+      }
+      return new BinaryColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
new file mode 100644
index 0000000000..4d60c2cb60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanConstantFill implements IFill {
+
+  // fill value
+  private final boolean value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final boolean[] valueArray;
+
+  public BooleanConstantFill(boolean value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new boolean[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      boolean[] array = new boolean[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getBoolean(i);
+        }
+      }
+      return new BooleanColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
new file mode 100644
index 0000000000..d3a9244b9c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoubleConstantFill implements IFill {
+
+  // fill value
+  private final double value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final double[] valueArray;
+
+  public DoubleConstantFill(double value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new double[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      double[] array = new double[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getDouble(i);
+        }
+      }
+      return new DoubleColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
new file mode 100644
index 0000000000..d58addee1f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatConstantFill implements IFill {
+
+  // fill value
+  private final float value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final float[] valueArray;
+
+  public FloatConstantFill(float value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new float[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      float[] array = new float[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getFloat(i);
+        }
+      }
+      return new FloatColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
new file mode 100644
index 0000000000..85c21f2e4c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntConstantFill implements IFill {
+
+  // fill value
+  private final int value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final int[] valueArray;
+
+  public IntConstantFill(int value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new int[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      int[] array = new int[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getInt(i);
+        }
+      }
+      return new IntColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
new file mode 100644
index 0000000000..1aae78a67f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongConstantFill implements IFill {
+
+  // fill value
+  private final long value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final long[] valueArray;
+
+  public LongConstantFill(long value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new long[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      long[] array = new long[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getLong(i);
+        }
+      }
+      return new LongColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
new file mode 100644
index 0000000000..855d90585e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryPreviousFill implements IFill {
+
+  // previous value
+  private Binary value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public BinaryPreviousFill(Binary value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getBinary(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getBinary(size - 1);
+        return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
+      }
+    } else {
+      Binary[] array = new Binary[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getBinary(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new BinaryColumn(size, Optional.empty(), array);
+      } else {
+        return new BinaryColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
new file mode 100644
index 0000000000..766b6296af
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanPreviousFill implements IFill {
+
+  // previous value
+  private boolean value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public BooleanPreviousFill(boolean value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getBoolean(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getBoolean(size - 1);
+        return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
+      }
+    } else {
+      boolean[] array = new boolean[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getBoolean(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new BooleanColumn(size, Optional.empty(), array);
+      } else {
+        return new BooleanColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
new file mode 100644
index 0000000000..a5bc6aeaf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoublePreviousFill implements IFill {
+
+  // previous value
+  private double value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public DoublePreviousFill(double value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getDouble(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getDouble(size - 1);
+        return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
+      }
+    } else {
+      double[] array = new double[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getDouble(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new DoubleColumn(size, Optional.empty(), array);
+      } else {
+        return new DoubleColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
new file mode 100644
index 0000000000..e80a85b8ad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatPreviousFill implements IFill {
+
+  // previous value
+  private float value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public FloatPreviousFill(float value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getFloat(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getFloat(size - 1);
+        return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), new float[] {value}), size);
+      }
+    } else {
+      float[] array = new float[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getFloat(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new FloatColumn(size, Optional.empty(), array);
+      } else {
+        return new FloatColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
new file mode 100644
index 0000000000..0b4686c9a2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntPreviousFill implements IFill {
+
+  // previous value
+  private int value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public IntPreviousFill(int value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getInt(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getInt(size - 1);
+        return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), new int[] {value}), size);
+      }
+    } else {
+      int[] array = new int[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getInt(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new IntColumn(size, Optional.empty(), array);
+      } else {
+        return new IntColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
new file mode 100644
index 0000000000..ca41850020
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongPreviousFill implements IFill {
+
+  // previous value
+  private long value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public LongPreviousFill(long value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getLong(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getLong(size - 1);
+        return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), new long[] {value}), size);
+      }
+    } else {
+      long[] array = new long[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getLong(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new LongColumn(size, Optional.empty(), array);
+      } else {
+        return new LongColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}


[iotdb] 02/03: spotless

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d2fe7abbea01931902fb393b1f497904f025da59
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 6 16:27:17 2022 +0800

    spotless
---
 .../execution/operator/process/fill/previous/BinaryPreviousFill.java   | 3 ++-
 .../execution/operator/process/fill/previous/BooleanPreviousFill.java  | 3 ++-
 .../execution/operator/process/fill/previous/DoublePreviousFill.java   | 3 ++-
 .../execution/operator/process/fill/previous/FloatPreviousFill.java    | 3 ++-
 .../mpp/execution/operator/process/fill/previous/IntPreviousFill.java  | 3 ++-
 .../mpp/execution/operator/process/fill/previous/LongPreviousFill.java | 3 ++-
 6 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
index 855d90585e..897cd21d7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -62,7 +62,8 @@ public class BinaryPreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getBinary(size - 1);
-        return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
       }
     } else {
       Binary[] array = new Binary[size];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
index 766b6296af..a1f3c5a9e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -61,7 +61,8 @@ public class BooleanPreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getBoolean(size - 1);
-        return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
       }
     } else {
       boolean[] array = new boolean[size];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
index a5bc6aeaf7..e113ffffd5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -61,7 +61,8 @@ public class DoublePreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getDouble(size - 1);
-        return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
       }
     } else {
       double[] array = new double[size];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
index e80a85b8ad..d482430a6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -61,7 +61,8 @@ public class FloatPreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getFloat(size - 1);
-        return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), new float[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new FloatColumn(1, Optional.empty(), new float[] {value}), size);
       }
     } else {
       float[] array = new float[size];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
index 0b4686c9a2..6e95f028b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -61,7 +61,8 @@ public class IntPreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getInt(size - 1);
-        return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), new int[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new IntColumn(1, Optional.empty(), new int[] {value}), size);
       }
     } else {
       int[] array = new int[size];
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
index ca41850020..116f616aab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -61,7 +61,8 @@ public class LongPreviousFill implements IFill {
       } else {
         // update the value using last non-null value
         value = column.getLong(size - 1);
-        return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), new long[] {value}), size);
+        return new RunLengthEncodedColumn(
+            new LongColumn(1, Optional.empty(), new long[] {value}), size);
       }
     } else {
       long[] array = new long[size];