You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/07 12:28:46 UTC

[iotdb] 01/03: [IOTDB-3038] Implementation of fill operator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 56fdd47cd840bf1b66b920477c297c05bd198e26
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 6 16:26:33 2022 +0800

    [IOTDB-3038] Implementation of fill operator
---
 .../mpp/execution/operator/process/fill/IFill.java | 27 +++++++
 .../process/fill/constant/BinaryConstantFill.java  | 68 ++++++++++++++++
 .../process/fill/constant/BooleanConstantFill.java | 67 ++++++++++++++++
 .../process/fill/constant/DoubleConstantFill.java  | 67 ++++++++++++++++
 .../process/fill/constant/FloatConstantFill.java   | 67 ++++++++++++++++
 .../process/fill/constant/IntConstantFill.java     | 67 ++++++++++++++++
 .../process/fill/constant/LongConstantFill.java    | 67 ++++++++++++++++
 .../process/fill/previous/BinaryPreviousFill.java  | 93 ++++++++++++++++++++++
 .../process/fill/previous/BooleanPreviousFill.java | 92 +++++++++++++++++++++
 .../process/fill/previous/DoublePreviousFill.java  | 92 +++++++++++++++++++++
 .../process/fill/previous/FloatPreviousFill.java   | 92 +++++++++++++++++++++
 .../process/fill/previous/IntPreviousFill.java     | 92 +++++++++++++++++++++
 .../process/fill/previous/LongPreviousFill.java    | 92 +++++++++++++++++++++
 13 files changed, 983 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
new file mode 100644
index 0000000000..b7e92fe864
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IFill {
+
+  Column fill(TsBlock tsBlock);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
new file mode 100644
index 0000000000..9cf6be1ce5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryConstantFill implements IFill {
+
+  // fill value
+  private final Binary value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn, size of it must be 1
+  private final Binary[] valueArray;
+
+  public BinaryConstantFill(Binary value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new Binary[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      Binary[] array = new Binary[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getBinary(i);
+        }
+      }
+      return new BinaryColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
new file mode 100644
index 0000000000..4d60c2cb60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanConstantFill implements IFill {
+
+  // fill value
+  private final boolean value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final boolean[] valueArray;
+
+  public BooleanConstantFill(boolean value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new boolean[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      boolean[] array = new boolean[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getBoolean(i);
+        }
+      }
+      return new BooleanColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
new file mode 100644
index 0000000000..d3a9244b9c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoubleConstantFill implements IFill {
+
+  // fill value
+  private final double value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final double[] valueArray;
+
+  public DoubleConstantFill(double value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new double[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      double[] array = new double[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getDouble(i);
+        }
+      }
+      return new DoubleColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
new file mode 100644
index 0000000000..d58addee1f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatConstantFill implements IFill {
+
+  // fill value
+  private final float value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final float[] valueArray;
+
+  public FloatConstantFill(float value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new float[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      float[] array = new float[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getFloat(i);
+        }
+      }
+      return new FloatColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
new file mode 100644
index 0000000000..85c21f2e4c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntConstantFill implements IFill {
+
+  // fill value
+  private final int value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final int[] valueArray;
+
+  public IntConstantFill(int value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new int[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      int[] array = new int[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getInt(i);
+        }
+      }
+      return new IntColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
new file mode 100644
index 0000000000..1aae78a67f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongConstantFill implements IFill {
+
+  // fill value
+  private final long value;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+  // used for constructing RunLengthEncodedColumn
+  private final long[] valueArray;
+
+  public LongConstantFill(long value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+    this.valueArray = new long[] {value};
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
+    } else {
+      long[] array = new long[size];
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          array[i] = value;
+        } else {
+          array[i] = column.getLong(i);
+        }
+      }
+      return new LongColumn(size, Optional.empty(), array);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
new file mode 100644
index 0000000000..855d90585e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryPreviousFill implements IFill {
+
+  // previous value
+  private Binary value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public BinaryPreviousFill(Binary value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getBinary(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getBinary(size - 1);
+        return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
+      }
+    } else {
+      Binary[] array = new Binary[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getBinary(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new BinaryColumn(size, Optional.empty(), array);
+      } else {
+        return new BinaryColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
new file mode 100644
index 0000000000..766b6296af
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanPreviousFill implements IFill {
+
+  // previous value
+  private boolean value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public BooleanPreviousFill(boolean value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getBoolean(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getBoolean(size - 1);
+        return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
+      }
+    } else {
+      boolean[] array = new boolean[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getBoolean(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new BooleanColumn(size, Optional.empty(), array);
+      } else {
+        return new BooleanColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
new file mode 100644
index 0000000000..a5bc6aeaf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoublePreviousFill implements IFill {
+
+  // previous value
+  private double value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public DoublePreviousFill(double value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getDouble(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getDouble(size - 1);
+        return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
+      }
+    } else {
+      double[] array = new double[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getDouble(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new DoubleColumn(size, Optional.empty(), array);
+      } else {
+        return new DoubleColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
new file mode 100644
index 0000000000..e80a85b8ad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatPreviousFill implements IFill {
+
+  // previous value
+  private float value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public FloatPreviousFill(float value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getFloat(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getFloat(size - 1);
+        return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), new float[] {value}), size);
+      }
+    } else {
+      float[] array = new float[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getFloat(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new FloatColumn(size, Optional.empty(), array);
+      } else {
+        return new FloatColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
new file mode 100644
index 0000000000..0b4686c9a2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntPreviousFill implements IFill {
+
+  // previous value
+  private int value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public IntPreviousFill(int value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getInt(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getInt(size - 1);
+        return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), new int[] {value}), size);
+      }
+    } else {
+      int[] array = new int[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getInt(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new IntColumn(size, Optional.empty(), array);
+      } else {
+        return new IntColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
new file mode 100644
index 0000000000..ca41850020
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongPreviousFill implements IFill {
+
+  // previous value
+  private long value;
+  // whether previous value is null
+  private boolean previousIsNull;
+  // index of the column which is need to be filled
+  private final int columnIndex;
+
+  public LongPreviousFill(long value, int columnIndex) {
+    this.value = value;
+    this.columnIndex = columnIndex;
+  }
+
+  @Override
+  public Column fill(TsBlock tsBlock) {
+    Column column = tsBlock.getColumn(columnIndex);
+    int size = column.getPositionCount();
+    // if this column doesn't have any null value, or it's empty, just return itself;
+    if (!column.mayHaveNull() || size == 0) {
+      if (size != 0) {
+        previousIsNull = false;
+        // update the value using last non-null value
+        value = column.getLong(size - 1);
+      }
+      return column;
+    }
+    // if its values are all null
+    if (column instanceof RunLengthEncodedColumn) {
+      if (previousIsNull) {
+        return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
+      } else {
+        // update the value using last non-null value
+        value = column.getLong(size - 1);
+        return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), new long[] {value}), size);
+      }
+    } else {
+      long[] array = new long[size];
+      boolean[] isNull = new boolean[size];
+      // have no null value
+      boolean nonNullValue = true;
+      for (int i = 0; i < size; i++) {
+        if (column.isNull(i)) {
+          if (previousIsNull) {
+            isNull[i] = true;
+            nonNullValue = false;
+          } else {
+            array[i] = value;
+          }
+        } else {
+          array[i] = column.getLong(i);
+          value = array[i];
+          previousIsNull = false;
+        }
+      }
+      if (nonNullValue) {
+        return new LongColumn(size, Optional.empty(), array);
+      } else {
+        return new LongColumn(size, Optional.of(isNull), array);
+      }
+    }
+  }
+}