You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/07 12:28:46 UTC
[iotdb] 01/03: [IOTDB-3038] Implementation of fill operator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56fdd47cd840bf1b66b920477c297c05bd198e26
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Fri May 6 16:26:33 2022 +0800
[IOTDB-3038] Implementation of fill operator
---
.../mpp/execution/operator/process/fill/IFill.java | 27 +++++++
.../process/fill/constant/BinaryConstantFill.java | 68 ++++++++++++++++
.../process/fill/constant/BooleanConstantFill.java | 67 ++++++++++++++++
.../process/fill/constant/DoubleConstantFill.java | 67 ++++++++++++++++
.../process/fill/constant/FloatConstantFill.java | 67 ++++++++++++++++
.../process/fill/constant/IntConstantFill.java | 67 ++++++++++++++++
.../process/fill/constant/LongConstantFill.java | 67 ++++++++++++++++
.../process/fill/previous/BinaryPreviousFill.java | 93 ++++++++++++++++++++++
.../process/fill/previous/BooleanPreviousFill.java | 92 +++++++++++++++++++++
.../process/fill/previous/DoublePreviousFill.java | 92 +++++++++++++++++++++
.../process/fill/previous/FloatPreviousFill.java | 92 +++++++++++++++++++++
.../process/fill/previous/IntPreviousFill.java | 92 +++++++++++++++++++++
.../process/fill/previous/LongPreviousFill.java | 92 +++++++++++++++++++++
13 files changed, 983 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
new file mode 100644
index 0000000000..b7e92fe864
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IFill {
+
+ Column fill(TsBlock tsBlock);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
new file mode 100644
index 0000000000..9cf6be1ce5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryConstantFill implements IFill {
+
+ // fill value
+ private final Binary value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn, size of it must be 1
+ private final Binary[] valueArray;
+
+ public BinaryConstantFill(Binary value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new Binary[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ Binary[] array = new Binary[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getBinary(i);
+ }
+ }
+ return new BinaryColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
new file mode 100644
index 0000000000..4d60c2cb60
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanConstantFill implements IFill {
+
+ // fill value
+ private final boolean value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn
+ private final boolean[] valueArray;
+
+ public BooleanConstantFill(boolean value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new boolean[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ boolean[] array = new boolean[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getBoolean(i);
+ }
+ }
+ return new BooleanColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
new file mode 100644
index 0000000000..d3a9244b9c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoubleConstantFill implements IFill {
+
+ // fill value
+ private final double value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn
+ private final double[] valueArray;
+
+ public DoubleConstantFill(double value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new double[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ double[] array = new double[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getDouble(i);
+ }
+ }
+ return new DoubleColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
new file mode 100644
index 0000000000..d58addee1f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatConstantFill implements IFill {
+
+ // fill value
+ private final float value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn
+ private final float[] valueArray;
+
+ public FloatConstantFill(float value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new float[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ float[] array = new float[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getFloat(i);
+ }
+ }
+ return new FloatColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
new file mode 100644
index 0000000000..85c21f2e4c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntConstantFill implements IFill {
+
+ // fill value
+ private final int value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn
+ private final int[] valueArray;
+
+ public IntConstantFill(int value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new int[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ int[] array = new int[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getInt(i);
+ }
+ }
+ return new IntColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
new file mode 100644
index 0000000000..1aae78a67f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongConstantFill implements IFill {
+
+ // fill value
+ private final long value;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+ // used for constructing RunLengthEncodedColumn
+ private final long[] valueArray;
+
+ public LongConstantFill(long value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ this.valueArray = new long[] {value};
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
+ } else {
+ long[] array = new long[size];
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ array[i] = value;
+ } else {
+ array[i] = column.getLong(i);
+ }
+ }
+ return new LongColumn(size, Optional.empty(), array);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
new file mode 100644
index 0000000000..855d90585e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.Optional;
+
+public class BinaryPreviousFill implements IFill {
+
+ // previous value
+ private Binary value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public BinaryPreviousFill(Binary value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getBinary(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getBinary(size - 1);
+ return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
+ }
+ } else {
+ Binary[] array = new Binary[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getBinary(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new BinaryColumn(size, Optional.empty(), array);
+ } else {
+ return new BinaryColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
new file mode 100644
index 0000000000..766b6296af
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class BooleanPreviousFill implements IFill {
+
+ // previous value
+ private boolean value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public BooleanPreviousFill(boolean value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getBoolean(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getBoolean(size - 1);
+ return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
+ }
+ } else {
+ boolean[] array = new boolean[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getBoolean(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new BooleanColumn(size, Optional.empty(), array);
+ } else {
+ return new BooleanColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
new file mode 100644
index 0000000000..a5bc6aeaf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class DoublePreviousFill implements IFill {
+
+ // previous value
+ private double value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public DoublePreviousFill(double value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getDouble(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getDouble(size - 1);
+ return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
+ }
+ } else {
+ double[] array = new double[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getDouble(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new DoubleColumn(size, Optional.empty(), array);
+ } else {
+ return new DoubleColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
new file mode 100644
index 0000000000..e80a85b8ad
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class FloatPreviousFill implements IFill {
+
+ // previous value
+ private float value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public FloatPreviousFill(float value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getFloat(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getFloat(size - 1);
+ return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), new float[] {value}), size);
+ }
+ } else {
+ float[] array = new float[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getFloat(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new FloatColumn(size, Optional.empty(), array);
+ } else {
+ return new FloatColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
new file mode 100644
index 0000000000..0b4686c9a2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class IntPreviousFill implements IFill {
+
+ // previous value
+ private int value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public IntPreviousFill(int value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getInt(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getInt(size - 1);
+ return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), new int[] {value}), size);
+ }
+ } else {
+ int[] array = new int[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getInt(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new IntColumn(size, Optional.empty(), array);
+ } else {
+ return new IntColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
new file mode 100644
index 0000000000..ca41850020
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Optional;
+
+public class LongPreviousFill implements IFill {
+
+ // previous value
+ private long value;
+ // whether previous value is null
+ private boolean previousIsNull;
+ // index of the column which is need to be filled
+ private final int columnIndex;
+
+ public LongPreviousFill(long value, int columnIndex) {
+ this.value = value;
+ this.columnIndex = columnIndex;
+ }
+
+ @Override
+ public Column fill(TsBlock tsBlock) {
+ Column column = tsBlock.getColumn(columnIndex);
+ int size = column.getPositionCount();
+ // if this column doesn't have any null value, or it's empty, just return itself;
+ if (!column.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ value = column.getLong(size - 1);
+ }
+ return column;
+ }
+ // if its values are all null
+ if (column instanceof RunLengthEncodedColumn) {
+ if (previousIsNull) {
+ return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
+ } else {
+ // update the value using last non-null value
+ value = column.getLong(size - 1);
+ return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), new long[] {value}), size);
+ }
+ } else {
+ long[] array = new long[size];
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+ for (int i = 0; i < size; i++) {
+ if (column.isNull(i)) {
+ if (previousIsNull) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ array[i] = value;
+ }
+ } else {
+ array[i] = column.getLong(i);
+ value = array[i];
+ previousIsNull = false;
+ }
+ }
+ if (nonNullValue) {
+ return new LongColumn(size, Optional.empty(), array);
+ } else {
+ return new LongColumn(size, Optional.of(isNull), array);
+ }
+ }
+ }
+}