You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/07 12:28:48 UTC
[iotdb] 03/03: Add LinearFill
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e391866e417770626a5c7e4646f98814c0f870ff
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sat May 7 20:28:25 2022 +0800
Add LinearFill
---
.../execution/operator/process/FillOperator.java | 49 +++++-
.../execution/operator/process/LimitOperator.java | 3 +
.../operator/process/LinearFillOperator.java | 157 ++++++++++++++++++
.../execution/operator/process/OffsetOperator.java | 3 +
.../mpp/execution/operator/process/fill/IFill.java | 3 +-
.../process/fill/constant/BinaryConstantFill.java | 23 ++-
.../process/fill/constant/BooleanConstantFill.java | 23 ++-
.../process/fill/constant/DoubleConstantFill.java | 23 ++-
.../process/fill/constant/FloatConstantFill.java | 23 ++-
.../process/fill/constant/IntConstantFill.java | 23 ++-
.../process/fill/constant/LongConstantFill.java | 23 ++-
.../process/fill/linear/DoubleLinearFill.java | 94 +++++++++++
.../process/fill/linear/FloatLinearFill.java | 94 +++++++++++
.../process/fill/linear/IntLinearFill.java | 94 +++++++++++
.../operator/process/fill/linear/LinearFill.java | 179 +++++++++++++++++++++
.../process/fill/linear/LongLinearFill.java | 94 +++++++++++
.../process/fill/previous/BinaryPreviousFill.java | 31 ++--
.../process/fill/previous/BooleanPreviousFill.java | 31 ++--
.../process/fill/previous/DoublePreviousFill.java | 31 ++--
.../process/fill/previous/FloatPreviousFill.java | 31 ++--
.../process/fill/previous/IntPreviousFill.java | 31 ++--
.../process/fill/previous/LongPreviousFill.java | 31 ++--
22 files changed, 876 insertions(+), 218 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
index 02657f0b56..14281036ea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FillOperator.java
@@ -18,39 +18,76 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
import com.google.common.util.concurrent.ListenableFuture;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
public class FillOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final IFill[] fillArray;
+ private final Operator child;
+ private final int outputColumnCount;
+
+ public FillOperator(OperatorContext operatorContext, IFill[] fillArray, Operator child) {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+ checkArgument(
+ fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+ this.fillArray = fillArray;
+ this.child = requireNonNull(child, "child operator is null");
+ this.outputColumnCount = fillArray.length;
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ return child.isBlocked();
}
@Override
public TsBlock next() {
- return null;
+ TsBlock block = child.next();
+ if (block == null) {
+ return null;
+ }
+
+ checkArgument(
+ outputColumnCount == block.getValueColumnCount(),
+ "outputColumnCount is not equal to value column count of child operator's TsBlock");
+
+ Column[] valueColumns = new Column[outputColumnCount];
+
+ for (int i = 0; i < outputColumnCount; i++) {
+ valueColumns[i] = fillArray[i].fill(block.getColumn(i));
+ }
+
+ return TsBlock.wrapBlocksWithoutCopy(
+ block.getPositionCount(), block.getTimeColumn(), valueColumns);
}
@Override
public boolean hasNext() {
- return false;
+ return child.hasNext();
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ child.close();
}
@Override
public boolean isFinished() {
- return false;
+ return child.isFinished();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
index 9973e256ec..4ae909a2a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LimitOperator.java
@@ -53,6 +53,9 @@ public class LimitOperator implements ProcessOperator {
@Override
public TsBlock next() {
TsBlock block = child.next();
+ if (block == null) {
+ return null;
+ }
TsBlock res = block;
if (block.getPositionCount() <= remainingLimit) {
remainingLimit -= block.getPositionCount();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
new file mode 100644
index 0000000000..e7b523afc1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class LinearFillOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final LinearFill[] fillArray;
+ private final Operator child;
+ private final int outputColumnCount;
+ private final LinkedList<TsBlock> cachedTsBlock;
+ private final Column[] cachedFilledValueColumns;
+
+ private int currentFilledColumnIndex;
+
+ public LinearFillOperator(
+ OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
+ this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
+ checkArgument(
+ fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
+ this.fillArray = fillArray;
+ this.child = requireNonNull(child, "child operator is null");
+ this.outputColumnCount = fillArray.length;
+ this.cachedTsBlock = new LinkedList<>();
+ this.cachedFilledValueColumns = new Column[outputColumnCount];
+ this.currentFilledColumnIndex = 0;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+
+ // make sure we call child.next() at most once
+ boolean alreadyCallNext = false;
+ if (cachedTsBlock.isEmpty()) {
+ alreadyCallNext = true;
+ TsBlock nextBlock = child.next();
+ // child operator's calculation is not finished, so we just return null
+ if (nextBlock == null || nextBlock.isEmpty()) {
+ return nextBlock;
+ } else { // otherwise, we cache it
+ cachedTsBlock.addLast(nextBlock);
+ }
+ }
+
+ TsBlock block = cachedTsBlock.getFirst();
+ long currentEndTime = block.getEndTime();
+ // use cached TsBlock to keep filling remaining column
+ while (currentFilledColumnIndex < outputColumnCount) {
+ if (fillArray[currentFilledColumnIndex].needPrepareForNext(
+ currentEndTime, block.getColumn(currentFilledColumnIndex))) {
+ if (!alreadyCallNext) {
+ alreadyCallNext = true;
+ TsBlock nextBlock = child.next();
+ // child operator's calculation is not finished, so we just return null
+ if (nextBlock == null || nextBlock.isEmpty()) {
+ return nextBlock;
+ } else { // otherwise, we cache it
+ cachedTsBlock.addLast(nextBlock);
+ }
+ if (fillArray[currentFilledColumnIndex].prepareForNext(
+ currentEndTime,
+ cachedTsBlock.getLast().getTimeColumn(),
+ cachedTsBlock.getLast().getColumn(currentFilledColumnIndex))) {
+ cachedFilledValueColumns[currentFilledColumnIndex] =
+ fillArray[currentFilledColumnIndex].fill(
+ block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+ } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+ // not finished, and we just return null
+ return null;
+ }
+ } else { // more TsBlocks is needed to do current column's fill, so current calculation is
+ // not finished, and we just return null
+ return null;
+ }
+ } else {
+ cachedFilledValueColumns[currentFilledColumnIndex] =
+ fillArray[currentFilledColumnIndex].fill(
+ block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+ }
+ currentFilledColumnIndex++;
+ }
+
+ TsBlock originTsBlock = cachedTsBlock.removeFirst();
+ checkArgument(
+ outputColumnCount == originTsBlock.getValueColumnCount(),
+ "outputColumnCount is not equal to value column count of child operator's TsBlock");
+ TsBlock result =
+ TsBlock.wrapBlocksWithoutCopy(
+ originTsBlock.getPositionCount(),
+ originTsBlock.getTimeColumn(),
+ cachedFilledValueColumns);
+ Arrays.fill(cachedFilledValueColumns, null);
+ currentFilledColumnIndex = 0;
+ return result;
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = child.hasNext();
+ if (!hasNext) {
+ for (LinearFill linearFill : fillArray) {
+ linearFill.setNoMoreData();
+ }
+ }
+ return !cachedTsBlock.isEmpty() || hasNext;
+ }
+
+ @Override
+ public void close() throws Exception {
+ child.close();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return cachedTsBlock.isEmpty() && child.isFinished();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
index 04a7915e52..b2136a5b4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/OffsetOperator.java
@@ -53,6 +53,9 @@ public class OffsetOperator implements ProcessOperator {
@Override
public TsBlock next() {
TsBlock block = child.next();
+ if (block == null) {
+ return null;
+ }
if (remainingOffset > 0) {
int offset = Math.min((int) remainingOffset, block.getPositionCount());
remainingOffset -= offset;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
index b7e92fe864..d43325a21c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/IFill.java
@@ -18,10 +18,9 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
public interface IFill {
- Column fill(TsBlock tsBlock);
+ Column fill(Column valueColumn);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
index 9cf6be1ce5..f590adced3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BinaryConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -31,35 +30,31 @@ public class BinaryConstantFill implements IFill {
// fill value
private final Binary value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn, size of it must be 1
private final Binary[] valueArray;
- public BinaryConstantFill(Binary value, int columnIndex) {
+ public BinaryConstantFill(Binary value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new Binary[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new BinaryColumn(1, Optional.empty(), valueArray), size);
} else {
Binary[] array = new Binary[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getBinary(i);
+ array[i] = valueColumn.getBinary(i);
}
}
return new BinaryColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
index 4d60c2cb60..7604248242 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/BooleanConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class BooleanConstantFill implements IFill {
// fill value
private final boolean value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn
private final boolean[] valueArray;
- public BooleanConstantFill(boolean value, int columnIndex) {
+ public BooleanConstantFill(boolean value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new boolean[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new BooleanColumn(1, Optional.empty(), valueArray), size);
} else {
boolean[] array = new boolean[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getBoolean(i);
+ array[i] = valueColumn.getBoolean(i);
}
}
return new BooleanColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
index d3a9244b9c..4615619312 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/DoubleConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class DoubleConstantFill implements IFill {
// fill value
private final double value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn
private final double[] valueArray;
- public DoubleConstantFill(double value, int columnIndex) {
+ public DoubleConstantFill(double value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new double[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new DoubleColumn(1, Optional.empty(), valueArray), size);
} else {
double[] array = new double[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getDouble(i);
+ array[i] = valueColumn.getDouble(i);
}
}
return new DoubleColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
index d58addee1f..f7d0dc0f32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/FloatConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class FloatConstantFill implements IFill {
// fill value
private final float value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn
private final float[] valueArray;
- public FloatConstantFill(float value, int columnIndex) {
+ public FloatConstantFill(float value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new float[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new FloatColumn(1, Optional.empty(), valueArray), size);
} else {
float[] array = new float[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getFloat(i);
+ array[i] = valueColumn.getFloat(i);
}
}
return new FloatColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
index 85c21f2e4c..bbe34abf6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/IntConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class IntConstantFill implements IFill {
// fill value
private final int value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn
private final int[] valueArray;
- public IntConstantFill(int value, int columnIndex) {
+ public IntConstantFill(int value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new int[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new IntColumn(1, Optional.empty(), valueArray), size);
} else {
int[] array = new int[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getInt(i);
+ array[i] = valueColumn.getInt(i);
}
}
return new IntColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
index 1aae78a67f..72721980da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/constant/LongConstantFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.constant;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -30,35 +29,31 @@ public class LongConstantFill implements IFill {
// fill value
private final long value;
- // index of the column which is need to be filled
- private final int columnIndex;
// used for constructing RunLengthEncodedColumn
private final long[] valueArray;
- public LongConstantFill(long value, int columnIndex) {
+ public LongConstantFill(long value) {
this.value = value;
- this.columnIndex = columnIndex;
this.valueArray = new long[] {value};
}
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
- return column;
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
return new RunLengthEncodedColumn(new LongColumn(1, Optional.empty(), valueArray), size);
} else {
long[] array = new long[size];
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
array[i] = value;
} else {
- array[i] = column.getLong(i);
+ array[i] = valueColumn.getLong(i);
}
}
return new LongColumn(size, Optional.empty(), array);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
new file mode 100644
index 0000000000..6dd68192c2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+
+import java.util.Optional;
+
+public class DoubleLinearFill extends LinearFill {
+
+ // previous value
+ private double previousValue;
+ // next non-null value whose time is closest to the current TsBlock's endTime
+ private double nextValue;
+
+ private double nextValueInCurrentColumn;
+
+ @Override
+ void fillValue(Column column, int index, Object array) {
+ ((double[]) array)[index] = column.getDouble(index);
+ }
+
+ @Override
+ void fillValue(Object array, int index) {
+ ((double[]) array)[index] = getFilledValue();
+ }
+
+ @Override
+ Object createValueArray(int size) {
+ return new double[size];
+ }
+
+ @Override
+ Column createNullValueColumn() {
+ return DoubleColumnBuilder.NULL_VALUE_BLOCK;
+ }
+
+ @Override
+ Column createFilledValueColumn() {
+ double filledValue = getFilledValue();
+ return new DoubleColumn(1, Optional.empty(), new double[] {filledValue});
+ }
+
+ @Override
+ Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+ if (nonNullValue) {
+ return new DoubleColumn(size, Optional.empty(), (double[]) array);
+ } else {
+ return new DoubleColumn(size, Optional.of(isNull), (double[]) array);
+ }
+ }
+
+ @Override
+ void updatePreviousValue(Column column, int index) {
+ previousValue = column.getDouble(index);
+ }
+
+ @Override
+ void updateNextValue(Column nextValueColumn, int index) {
+ this.nextValue = nextValueColumn.getDouble(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+ this.nextValueInCurrentColumn = nextValueColumn.getDouble(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn() {
+ this.nextValueInCurrentColumn = this.nextValue;
+ }
+
+ private double getFilledValue() {
+ return (previousValue + nextValueInCurrentColumn) / 2.0;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
new file mode 100644
index 0000000000..2cc80f2fd0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+
+import java.util.Optional;
+
+public class FloatLinearFill extends LinearFill {
+
+ // previous value
+ private float previousValue;
+ // next non-null value whose time is closest to the current TsBlock's endTime
+ private float nextValue;
+
+ private float nextValueInCurrentColumn;
+
+ @Override
+ void fillValue(Column column, int index, Object array) {
+ ((float[]) array)[index] = column.getFloat(index);
+ }
+
+ @Override
+ void fillValue(Object array, int index) {
+ ((float[]) array)[index] = getFilledValue();
+ }
+
+ @Override
+ Object createValueArray(int size) {
+ return new float[size];
+ }
+
+ @Override
+ Column createNullValueColumn() {
+ return FloatColumnBuilder.NULL_VALUE_BLOCK;
+ }
+
+ @Override
+ Column createFilledValueColumn() {
+ float filledValue = getFilledValue();
+ return new FloatColumn(1, Optional.empty(), new float[] {filledValue});
+ }
+
+ @Override
+ Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+ if (nonNullValue) {
+ return new FloatColumn(size, Optional.empty(), (float[]) array);
+ } else {
+ return new FloatColumn(size, Optional.of(isNull), (float[]) array);
+ }
+ }
+
+ @Override
+ void updatePreviousValue(Column column, int index) {
+ previousValue = column.getFloat(index);
+ }
+
+ @Override
+ void updateNextValue(Column nextValueColumn, int index) {
+ this.nextValue = nextValueColumn.getFloat(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+ this.nextValueInCurrentColumn = nextValueColumn.getFloat(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn() {
+ this.nextValueInCurrentColumn = this.nextValue;
+ }
+
+ private float getFilledValue() {
+ return (previousValue + nextValueInCurrentColumn) / 2.0f;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
new file mode 100644
index 0000000000..97ea4d05cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+
+import java.util.Optional;
+
+public class IntLinearFill extends LinearFill {
+
+ // previous value
+ private int previousValue;
+ // next non-null value whose time is closest to the current TsBlock's endTime
+ private int nextValue;
+
+ private int nextValueInCurrentColumn;
+
+ @Override
+ void fillValue(Column column, int index, Object array) {
+ ((int[]) array)[index] = column.getInt(index);
+ }
+
+ @Override
+ void fillValue(Object array, int index) {
+ ((int[]) array)[index] = getFilledValue();
+ }
+
+ @Override
+ Object createValueArray(int size) {
+ return new int[size];
+ }
+
+ @Override
+ Column createNullValueColumn() {
+ return IntColumnBuilder.NULL_VALUE_BLOCK;
+ }
+
+ @Override
+ Column createFilledValueColumn() {
+ int filledValue = getFilledValue();
+ return new IntColumn(1, Optional.empty(), new int[] {filledValue});
+ }
+
+ @Override
+ Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+ if (nonNullValue) {
+ return new IntColumn(size, Optional.empty(), (int[]) array);
+ } else {
+ return new IntColumn(size, Optional.of(isNull), (int[]) array);
+ }
+ }
+
+ @Override
+ void updatePreviousValue(Column column, int index) {
+ previousValue = column.getInt(index);
+ }
+
+ @Override
+ void updateNextValue(Column nextValueColumn, int index) {
+ this.nextValue = nextValueColumn.getInt(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+ this.nextValueInCurrentColumn = nextValueColumn.getInt(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn() {
+ this.nextValueInCurrentColumn = this.nextValue;
+ }
+
+ private int getFilledValue() {
+ return (previousValue + nextValueInCurrentColumn) / 2;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
new file mode 100644
index 0000000000..4a137c9eba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public abstract class LinearFill {
+
+ // whether previous value is null
+ protected boolean previousIsNull = true;
+ // time of next value
+ protected long nextTime = Long.MIN_VALUE;
+
+ protected long nextTimeInCurrentColumn;
+ // whether next value is null
+ protected boolean noMoreNext = false;
+
+ /**
+ * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
+ * has been set to true
+ *
+ * @param timeColumn TimeColumn of valueColumn
+ * @param valueColumn valueColumn that need to be filled
+ * @return Value Column that has been filled
+ */
+ public Column fill(TimeColumn timeColumn, Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
+ if (size != 0) {
+ previousIsNull = false;
+ // update the value using last non-null value
+ updatePreviousValue(valueColumn, valueColumn.getPositionCount() - 1);
+ }
+ return valueColumn;
+ }
+
+ // if its values are all null
+ if (valueColumn instanceof RunLengthEncodedColumn) {
+ // previous value is null or next value is null, we just return NULL_VALUE_BLOCK
+ if (previousIsNull || nextTime < timeColumn.getStartTime()) {
+ return new RunLengthEncodedColumn(createNullValueColumn(), size);
+ } else {
+ prepareForNextValueInCurrentColumn(
+ timeColumn.getEndTime(), timeColumn.getPositionCount() - 1, timeColumn, valueColumn);
+ return new RunLengthEncodedColumn(createFilledValueColumn(), size);
+ }
+ } else {
+ Object array = createValueArray(size);
+ boolean[] isNull = new boolean[size];
+ // have no null value
+ boolean nonNullValue = true;
+
+ for (int i = 0; i < size; i++) {
+ // current value is null, we need to fill it
+ if (valueColumn.isNull(i)) {
+ long currentTime = timeColumn.getLong(i);
+ prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn, valueColumn);
+ // we don't fill it, if either previous value or next value is null
+ if (previousIsNull || nextIsNull(currentTime)) {
+ isNull[i] = true;
+ nonNullValue = false;
+ } else {
+ // fill value using previous and next value
+ fillValue(array, i);
+ }
+ } else { // current is not null
+ // fill value using its own value
+ fillValue(valueColumn, i, array);
+ // update previous value
+ updatePreviousValue(valueColumn, i);
+ previousIsNull = false;
+ }
+ }
+ return createFilledValueColumn(array, isNull, nonNullValue, size);
+ }
+ }
+
+ /**
+ * @param time end time of current valueColumn that need to be filled
+ * @param valueColumn valueColumn that need to be filled
+ * @return true if valueColumn can't be filled using current information, and we need to get next
+ * TsBlock and then call prepareForNext false if valueColumn can be filled using current
+ * information, and we can directly call fill() function
+ */
+ public boolean needPrepareForNext(long time, Column valueColumn) {
+ return !noMoreNext && time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+ }
+
+ /**
+ * @param time end time of current valueColumn that need to be filled
+ * @param nextTimeColumn TimeColumn of next TsBlock
+ * @param nextValueColumn Value Column of next TsBlock
+ * @return true if we get enough information to fill current column, and we can stop getting next
+ * TsBlock and calling prepareForNext false if we still don't get enough information to fill
+ * current column, and still need to keep getting next TsBlock and then call prepareForNext
+ */
+ public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+ if (time <= nextTime) {
+ return true;
+ }
+
+ for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
+ if (!nextValueColumn.isNull(i)) {
+ updateNextValue(nextValueColumn, i);
+ this.nextTime = nextTimeColumn.getLong(i);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** no more next column */
+ public void setNoMoreData() {
+ this.noMoreNext = true;
+ }
+
+ private boolean nextIsNull(long time) {
+ return nextTimeInCurrentColumn <= time;
+ }
+
+ private void prepareForNextValueInCurrentColumn(
+ long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
+ if (time <= nextTimeInCurrentColumn) {
+ return;
+ }
+ for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
+ if (!valueColumn.isNull(i)) {
+ this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+ updateNextValueInCurrentColumn(valueColumn, i);
+ return;
+ }
+ }
+
+ // current column's value is not enough for filling, we should use value of next Column
+ this.nextTimeInCurrentColumn = this.nextTime;
+ updateNextValueInCurrentColumn();
+ }
+
+ abstract void fillValue(Column column, int index, Object array);
+
+ abstract void fillValue(Object array, int index);
+
+ abstract Object createValueArray(int size);
+
+ abstract Column createNullValueColumn();
+
+ abstract Column createFilledValueColumn();
+
+ abstract Column createFilledValueColumn(
+ Object array, boolean[] isNull, boolean nonNullValue, int size);
+
+ abstract void updatePreviousValue(Column column, int index);
+
+ abstract void updateNextValue(Column nextValueColumn, int index);
+
+ abstract void updateNextValueInCurrentColumn(Column nextValueColumn, int index);
+
+ /** update nextValueInCurrentColumn using value of next Column */
+ abstract void updateNextValueInCurrentColumn();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
new file mode 100644
index 0000000000..dd0e3e75fb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+
+import java.util.Optional;
+
+public class LongLinearFill extends LinearFill {
+
+ // previous value
+ private long previousValue;
+ // next non-null value whose time is closest to the current TsBlock's endTime
+ private long nextValue;
+
+ private long nextValueInCurrentColumn;
+
+ @Override
+ void fillValue(Column column, int index, Object array) {
+ ((long[]) array)[index] = column.getLong(index);
+ }
+
+ @Override
+ void fillValue(Object array, int index) {
+ ((long[]) array)[index] = getFilledValue();
+ }
+
+ @Override
+ Object createValueArray(int size) {
+ return new long[size];
+ }
+
+ @Override
+ Column createNullValueColumn() {
+ return LongColumnBuilder.NULL_VALUE_BLOCK;
+ }
+
+ @Override
+ Column createFilledValueColumn() {
+ long filledValue = getFilledValue();
+ return new LongColumn(1, Optional.empty(), new long[] {filledValue});
+ }
+
+ @Override
+ Column createFilledValueColumn(Object array, boolean[] isNull, boolean nonNullValue, int size) {
+ if (nonNullValue) {
+ return new LongColumn(size, Optional.empty(), (long[]) array);
+ } else {
+ return new LongColumn(size, Optional.of(isNull), (long[]) array);
+ }
+ }
+
+ @Override
+ void updatePreviousValue(Column column, int index) {
+ previousValue = column.getLong(index);
+ }
+
+ @Override
+ void updateNextValue(Column nextValueColumn, int index) {
+ this.nextValue = nextValueColumn.getLong(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn(Column nextValueColumn, int index) {
+ this.nextValueInCurrentColumn = nextValueColumn.getLong(index);
+ }
+
+ @Override
+ void updateNextValueInCurrentColumn() {
+ this.nextValueInCurrentColumn = this.nextValue;
+ }
+
+ private long getFilledValue() {
+ return (previousValue + nextValueInCurrentColumn) / 2L;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
index 897cd21d7a..e6cbf22b4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BinaryPreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -33,35 +32,25 @@ public class BinaryPreviousFill implements IFill {
// previous value
private Binary value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public BinaryPreviousFill(Binary value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getBinary(size - 1);
+ value = valueColumn.getBinary(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(BinaryColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getBinary(size - 1);
return new RunLengthEncodedColumn(
new BinaryColumn(1, Optional.empty(), new Binary[] {value}), size);
}
@@ -71,7 +60,7 @@ public class BinaryPreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -79,7 +68,7 @@ public class BinaryPreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getBinary(i);
+ array[i] = valueColumn.getBinary(i);
value = array[i];
previousIsNull = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
index a1f3c5a9e7..a7a1da249c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/BooleanPreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -32,35 +31,25 @@ public class BooleanPreviousFill implements IFill {
// previous value
private boolean value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public BooleanPreviousFill(boolean value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getBoolean(size - 1);
+ value = valueColumn.getBoolean(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(BooleanColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getBoolean(size - 1);
return new RunLengthEncodedColumn(
new BooleanColumn(1, Optional.empty(), new boolean[] {value}), size);
}
@@ -70,7 +59,7 @@ public class BooleanPreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -78,7 +67,7 @@ public class BooleanPreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getBoolean(i);
+ array[i] = valueColumn.getBoolean(i);
value = array[i];
previousIsNull = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
index e113ffffd5..fd0e1892ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/DoublePreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -32,35 +31,25 @@ public class DoublePreviousFill implements IFill {
// previous value
private double value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public DoublePreviousFill(double value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getDouble(size - 1);
+ value = valueColumn.getDouble(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(DoubleColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getDouble(size - 1);
return new RunLengthEncodedColumn(
new DoubleColumn(1, Optional.empty(), new double[] {value}), size);
}
@@ -70,7 +59,7 @@ public class DoublePreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -78,7 +67,7 @@ public class DoublePreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getDouble(i);
+ array[i] = valueColumn.getDouble(i);
value = array[i];
previousIsNull = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
index d482430a6d..9c8f221f72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/FloatPreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -32,35 +31,25 @@ public class FloatPreviousFill implements IFill {
// previous value
private float value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public FloatPreviousFill(float value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getFloat(size - 1);
+ value = valueColumn.getFloat(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(FloatColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getFloat(size - 1);
return new RunLengthEncodedColumn(
new FloatColumn(1, Optional.empty(), new float[] {value}), size);
}
@@ -70,7 +59,7 @@ public class FloatPreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -78,7 +67,7 @@ public class FloatPreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getFloat(i);
+ array[i] = valueColumn.getFloat(i);
value = array[i];
previousIsNull = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
index 6e95f028b1..a444dedc05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/IntPreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -32,35 +31,25 @@ public class IntPreviousFill implements IFill {
// previous value
private int value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public IntPreviousFill(int value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getInt(size - 1);
+ value = valueColumn.getInt(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(IntColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getInt(size - 1);
return new RunLengthEncodedColumn(
new IntColumn(1, Optional.empty(), new int[] {value}), size);
}
@@ -70,7 +59,7 @@ public class IntPreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -78,7 +67,7 @@ public class IntPreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getInt(i);
+ array[i] = valueColumn.getInt(i);
value = array[i];
previousIsNull = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
index 116f616aab..c145cc007f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/previous/LongPreviousFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.previous;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -32,35 +31,25 @@ public class LongPreviousFill implements IFill {
// previous value
private long value;
// whether previous value is null
- private boolean previousIsNull;
- // index of the column which is need to be filled
- private final int columnIndex;
-
- public LongPreviousFill(long value, int columnIndex) {
- this.value = value;
- this.columnIndex = columnIndex;
- }
+ private boolean previousIsNull = true;
@Override
- public Column fill(TsBlock tsBlock) {
- Column column = tsBlock.getColumn(columnIndex);
- int size = column.getPositionCount();
- // if this column doesn't have any null value, or it's empty, just return itself;
- if (!column.mayHaveNull() || size == 0) {
+ public Column fill(Column valueColumn) {
+ int size = valueColumn.getPositionCount();
+ // if this valueColumn doesn't have any null value, or it's empty, just return itself;
+ if (!valueColumn.mayHaveNull() || size == 0) {
if (size != 0) {
previousIsNull = false;
// update the value using last non-null value
- value = column.getLong(size - 1);
+ value = valueColumn.getLong(size - 1);
}
- return column;
+ return valueColumn;
}
// if its values are all null
- if (column instanceof RunLengthEncodedColumn) {
+ if (valueColumn instanceof RunLengthEncodedColumn) {
if (previousIsNull) {
return new RunLengthEncodedColumn(LongColumnBuilder.NULL_VALUE_BLOCK, size);
} else {
- // update the value using last non-null value
- value = column.getLong(size - 1);
return new RunLengthEncodedColumn(
new LongColumn(1, Optional.empty(), new long[] {value}), size);
}
@@ -70,7 +59,7 @@ public class LongPreviousFill implements IFill {
// have no null value
boolean nonNullValue = true;
for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
+ if (valueColumn.isNull(i)) {
if (previousIsNull) {
isNull[i] = true;
nonNullValue = false;
@@ -78,7 +67,7 @@ public class LongPreviousFill implements IFill {
array[i] = value;
}
} else {
- array[i] = column.getLong(i);
+ array[i] = valueColumn.getLong(i);
value = array[i];
previousIsNull = false;
}