You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/08 04:39:14 UTC
[iotdb] branch FillOperator updated: done
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch FillOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/FillOperator by this push:
new 72dd5b03fd done
72dd5b03fd is described below
commit 72dd5b03fd7fb304da9a5a38213a2a262148ae3d
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Sun May 8 12:38:44 2022 +0800
done
---
.../operator/process/LinearFillOperator.java | 84 ++--
.../operator/process/fill/linear/LinearFill.java | 14 +-
.../mpp/execution/operator/FillOperatorTest.java | 353 +++++++++++++++++
.../execution/operator/LinearFillOperatorTest.java | 441 +++++++++++++++++++++
4 files changed, 857 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 3d124189f7..f2f713f174 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -26,8 +26,9 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
@@ -40,10 +41,19 @@ public class LinearFillOperator implements ProcessOperator {
private final Operator child;
private final int outputColumnCount;
// TODO need to spill it to disk if it consumes too much memory
- private final LinkedList<TsBlock> cachedTsBlock;
+ private final List<TsBlock> cachedTsBlock;
+ // next TsBlock Index for each Column
+ private final int[] nextTsBlockIndex;
+ // cache already filled column in last unfinished filling
private final Column[] cachedFilledValueColumns;
+ // indicate which column we are filling
private int currentFilledColumnIndex;
+ // indicate whether we can call child.next()
+ // it's used to make sure that child.next() will only be called once in LinearFillOperator.next();
+ private boolean canCallNext;
+ // indicate whether there is more TsBlock for child operator
+ private boolean noMoreTsBlock;
public LinearFillOperator(
OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
@@ -53,9 +63,13 @@ public class LinearFillOperator implements ProcessOperator {
this.fillArray = fillArray;
this.child = requireNonNull(child, "child operator is null");
this.outputColumnCount = fillArray.length;
- this.cachedTsBlock = new LinkedList<>();
+ this.cachedTsBlock = new ArrayList<>();
+ this.nextTsBlockIndex = new int[outputColumnCount];
+ Arrays.fill(this.nextTsBlockIndex, 1);
this.cachedFilledValueColumns = new Column[outputColumnCount];
this.currentFilledColumnIndex = 0;
+ this.canCallNext = false;
+ this.noMoreTsBlock = true;
}
@Override
@@ -72,49 +86,64 @@ public class LinearFillOperator implements ProcessOperator {
public TsBlock next() {
// make sure we call child.next() at most once
- boolean alreadyCallNext = false;
if (cachedTsBlock.isEmpty()) {
- alreadyCallNext = true;
+ canCallNext = false;
TsBlock nextBlock = child.next();
// child operator's calculation is not finished, so we just return null
if (nextBlock == null || nextBlock.isEmpty()) {
return nextBlock;
} else { // otherwise, we cache it
- cachedTsBlock.addLast(nextBlock);
+ cachedTsBlock.add(nextBlock);
}
}
- TsBlock block = cachedTsBlock.getFirst();
+ TsBlock block = cachedTsBlock.get(0);
long currentEndTime = block.getEndTime();
// use cached TsBlock to keep filling remaining column
while (currentFilledColumnIndex < outputColumnCount) {
+ // current valueColumn can't be filled using current information
if (fillArray[currentFilledColumnIndex].needPrepareForNext(
currentEndTime, block.getColumn(currentFilledColumnIndex))) {
- if (!alreadyCallNext) {
- alreadyCallNext = true;
+
+ if (canCallNext) { // if we can call child.next(), we call that and cache it in
+ // cachedTsBlock
+ canCallNext = false;
TsBlock nextBlock = child.next();
// child operator's calculation is not finished, so we just return null
if (nextBlock == null || nextBlock.isEmpty()) {
return nextBlock;
} else { // otherwise, we cache it
- cachedTsBlock.addLast(nextBlock);
+ cachedTsBlock.add(nextBlock);
}
+ }
+
+ // next TsBlock has already been in the cachedTsBlock
+ while (nextTsBlockIndex[currentFilledColumnIndex] < cachedTsBlock.size()) {
+ TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[currentFilledColumnIndex]);
+ nextTsBlockIndex[currentFilledColumnIndex]++;
if (fillArray[currentFilledColumnIndex].prepareForNext(
currentEndTime,
- cachedTsBlock.getLast().getTimeColumn(),
- cachedTsBlock.getLast().getColumn(currentFilledColumnIndex))) {
+ nextTsBlock.getTimeColumn(),
+ nextTsBlock.getColumn(currentFilledColumnIndex))) {
+ cachedFilledValueColumns[currentFilledColumnIndex] =
+ fillArray[currentFilledColumnIndex].fill(
+ block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
+ break;
+ }
+ }
+
+ // current column's filling is not finished using current owning TsBlock
+ if (cachedFilledValueColumns[currentFilledColumnIndex] == null) {
+ if (noMoreTsBlock) { // there is no more TsBlock, just use current owing TsBlock to fill
cachedFilledValueColumns[currentFilledColumnIndex] =
fillArray[currentFilledColumnIndex].fill(
block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
- } else { // more TsBlocks is needed to do current column's fill, so current calculation is
- // not finished, and we just return null
+ } else { // next TsBlock is not ready, more TsBlocks is needed to do current column's
+ // fill, so current calculation is not finished, and we just return null
return null;
}
- } else { // more TsBlocks is needed to do current column's fill, so current calculation is
- // not finished, and we just return null
- return null;
}
- } else {
+ } else { // current valueColumn can be filled using current information
cachedFilledValueColumns[currentFilledColumnIndex] =
fillArray[currentFilledColumnIndex].fill(
block.getTimeColumn(), block.getColumn(currentFilledColumnIndex));
@@ -122,29 +151,30 @@ public class LinearFillOperator implements ProcessOperator {
currentFilledColumnIndex++;
}
- TsBlock originTsBlock = cachedTsBlock.removeFirst();
+ TsBlock originTsBlock = cachedTsBlock.remove(0);
checkArgument(
outputColumnCount == originTsBlock.getValueColumnCount(),
"outputColumnCount is not equal to value column count of child operator's TsBlock");
TsBlock result =
- TsBlock.wrapBlocksWithoutCopy(
+ new TsBlock(
originTsBlock.getPositionCount(),
originTsBlock.getTimeColumn(),
cachedFilledValueColumns);
Arrays.fill(cachedFilledValueColumns, null);
+ for (int i = 0; i < outputColumnCount; i++) {
+ nextTsBlockIndex[i]--;
+ }
currentFilledColumnIndex = 0;
return result;
}
@Override
public boolean hasNext() {
- boolean hasNext = child.hasNext();
- if (!hasNext) {
- for (LinearFill linearFill : fillArray) {
- linearFill.setNoMoreData();
- }
- }
- return !cachedTsBlock.isEmpty() || hasNext;
+ // if child.hasNext() return false, it means that there is no more tsBlocks
+ noMoreTsBlock = !child.hasNext();
+ // if there is more tsBlock, we can call child.next() once
+ canCallNext = !noMoreTsBlock;
+ return !cachedTsBlock.isEmpty() || !noMoreTsBlock;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index d06a679d2d..f7c3576ec4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -22,6 +22,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import static com.google.common.base.Preconditions.checkArgument;
+
public abstract class LinearFill {
// whether previous value is null
@@ -30,8 +32,6 @@ public abstract class LinearFill {
protected long nextTime = Long.MIN_VALUE;
protected long nextTimeInCurrentColumn;
- // whether next value is null
- protected boolean noMoreNext = false;
/**
* Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
@@ -102,7 +102,7 @@ public abstract class LinearFill {
* information, and we can directly call fill() function
*/
public boolean needPrepareForNext(long time, Column valueColumn) {
- return !noMoreNext && time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+ return time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1);
}
/**
@@ -114,6 +114,9 @@ public abstract class LinearFill {
* current column, and still need to keep getting next TsBlock and then call prepareForNext
*/
public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+ checkArgument(
+ nextTimeColumn.getPositionCount() > 0 && nextTimeColumn.getLong(0) > time,
+ "nextColumn's time should be greater than current time");
if (time <= nextTime) {
return true;
}
@@ -128,11 +131,6 @@ public abstract class LinearFill {
return false;
}
- /** no more next column */
- public void setNoMoreData() {
- this.noMoreNext = true;
- }
-
private boolean nextIsNull(long time) {
return nextTimeInCurrentColumn <= time;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
new file mode 100644
index 0000000000..5cbe7b189f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/FillOperatorTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class FillOperatorTest {
+
+ @Test
+ public void batchConstantFillTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, FillOperator.class.getSimpleName());
+
+ IFill[] fillArray =
+ new IFill[] {
+ new DoubleConstantFill(520.0),
+ new DoubleConstantFill(520.0),
+ new DoubleConstantFill(520.0)
+ };
+ FillOperator fillOperator =
+ new FillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ int delta = index * 10000;
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(
+ TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE));
+ // 1 1.0, null, 100.0
+ builder.getTimeColumnBuilder().writeLong(1 + delta);
+ builder.getColumnBuilder(0).writeDouble(1 + delta);
+ builder.getColumnBuilder(1).appendNull();
+ builder.getColumnBuilder(2).writeDouble(100 + delta);
+ builder.declarePosition();
+ // 2 2.0, 20.0, 200.0
+ builder.getTimeColumnBuilder().writeLong(2 + delta);
+ builder.getColumnBuilder(0).writeDouble(2 + delta);
+ builder.getColumnBuilder(1).writeDouble(20 + delta);
+ builder.getColumnBuilder(2).writeDouble(200 + delta);
+ builder.declarePosition();
+ // 3 3.0, 30.0, null
+ builder.getTimeColumnBuilder().writeLong(3 + delta);
+ builder.getColumnBuilder(0).writeDouble(3 + delta);
+ builder.getColumnBuilder(1).writeDouble(30 + delta);
+ builder.getColumnBuilder(2).appendNull();
+ builder.declarePosition();
+ // 4 null, 40.0, null
+ builder.getTimeColumnBuilder().writeLong(4 + delta);
+ builder.getColumnBuilder(0).appendNull();
+ builder.getColumnBuilder(1).writeDouble(40 + delta);
+ builder.getColumnBuilder(2).appendNull();
+ builder.declarePosition();
+ // 5 null, null, 500.0
+ builder.getTimeColumnBuilder().writeLong(5 + delta);
+ builder.getColumnBuilder(0).appendNull();
+ builder.getColumnBuilder(1).appendNull();
+ builder.getColumnBuilder(2).writeDouble(500 + delta);
+ builder.declarePosition();
+
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ int count = 0;
+ double[][][] res =
+ new double[][][] {
+ {
+ {1.0, 520.0, 100.0},
+ {2, 20, 200},
+ {3, 30, 520.0},
+ {520.0, 40, 520.0},
+ {520.0, 520.0, 500}
+ },
+ {
+ {10001, 520.0, 10100},
+ {10002, 10020, 10200},
+ {10003, 10030, 520.0},
+ {520.0, 10040, 520.0},
+ {520.0, 520.0, 10500}
+ },
+ {
+ {20001, 520.0, 20100},
+ {20002, 20020, 20200},
+ {20003, 20030, 520.0},
+ {520.0, 20040, 520.0},
+ {520.0, 520.0, 20500}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ },
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ },
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ }
+ };
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = i + 1 + count * 10000L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 3; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getDouble(i), 0.00001);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(3, count);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void batchPreviousFillTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, FillOperator.class.getSimpleName());
+
+ IFill[] fillArray =
+ new IFill[] {new IntPreviousFill(), new IntPreviousFill(), new IntPreviousFill()};
+ FillOperator fillOperator =
+ new FillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ int delta = index * 10000;
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32));
+ // 1 1, null, 100
+ builder.getTimeColumnBuilder().writeLong(1 + delta);
+ builder.getColumnBuilder(0).writeInt(1 + delta);
+ builder.getColumnBuilder(1).appendNull();
+ builder.getColumnBuilder(2).writeInt(100 + delta);
+ builder.declarePosition();
+ // 2 2, 20, 200
+ builder.getTimeColumnBuilder().writeLong(2 + delta);
+ builder.getColumnBuilder(0).writeInt(2 + delta);
+ builder.getColumnBuilder(1).writeInt(20 + delta);
+ builder.getColumnBuilder(2).writeInt(200 + delta);
+ builder.declarePosition();
+ // 3 3, 30, null
+ builder.getTimeColumnBuilder().writeLong(3 + delta);
+ builder.getColumnBuilder(0).writeInt(3 + delta);
+ builder.getColumnBuilder(1).writeInt(30 + delta);
+ builder.getColumnBuilder(2).appendNull();
+ builder.declarePosition();
+ // 4 null, 40, null
+ builder.getTimeColumnBuilder().writeLong(4 + delta);
+ builder.getColumnBuilder(0).appendNull();
+ builder.getColumnBuilder(1).writeInt(40 + delta);
+ builder.getColumnBuilder(2).appendNull();
+ builder.declarePosition();
+ // 5 null, null, 500
+ builder.getTimeColumnBuilder().writeLong(5 + delta);
+ builder.getColumnBuilder(0).appendNull();
+ builder.getColumnBuilder(1).appendNull();
+ builder.getColumnBuilder(2).writeInt(500 + delta);
+ builder.declarePosition();
+
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ int count = 0;
+ int[][][] res =
+ new int[][][] {
+ {{1, 0, 100}, {2, 20, 200}, {3, 30, 200}, {3, 40, 200}, {3, 40, 500}},
+ {
+ {10001, 40, 10100},
+ {10002, 10020, 10200},
+ {10003, 10030, 10200},
+ {10003, 10040, 10200},
+ {10003, 10040, 10500}
+ },
+ {
+ {20001, 10040, 20100},
+ {20002, 20020, 20200},
+ {20003, 20030, 20200},
+ {20003, 20040, 20200},
+ {20003, 20040, 20500}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ },
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ },
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false},
+ {false, false, false}
+ }
+ };
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = i + 1 + count * 10000L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 3; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getInt(i));
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(3, count);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
new file mode 100644
index 0000000000..6c17c09d4f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class LinearFillOperatorTest {
+
+ @Test
+ public void batchLinearFillTest1() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ LinearFill[] fillArray =
+ new LinearFill[] {
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
+ };
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final float[][][] value =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 0.0f},
+ {21.0f, 22.0f, 0.0f, 0.0f},
+ {0.0f, 32.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 43.0f, 0.0f}
+ },
+ {
+ {51.0f, 0.0f, 53.0f, 0.0f},
+ {61.0f, 62.0f, 63.0f, 0.0f},
+ {71.0f, 72.0f, 0.0f, 74.0f},
+ {0.0f, 82.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 93.0f, 0.0f}
+ },
+ {
+ {101.0f, 0.0f, 103.0f, 0.0f},
+ {111.0f, 112.0f, 113.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 143.0f, 0.0f}
+ }
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, true},
+ {false, false, true, true},
+ {true, false, true, true},
+ {true, true, false, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, false, true},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, false, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, false, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, false, true}
+ }
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT));
+ for (int i = 0; i < 5; i++) {
+ builder.getTimeColumnBuilder().writeLong(i + index * 5L);
+ for (int j = 0; j < 4; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+ builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ int count = 0;
+ float[][][] res =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 39.0f},
+ {21.0f, 22.0f, 28.0f, 39.0f},
+ {36.0f, 32.0f, 28.0f, 39.0f},
+ {36.0f, 47.0f, 43.0f, 39.0f}
+ },
+ {
+ {51.0f, 47.0f, 53.0f, 39.0f},
+ {61.0f, 62.0f, 63.0f, 39.0f},
+ {71.0f, 72.0f, 78.0f, 74.0f},
+ {86.0f, 82.0f, 78.0f, 94.0f},
+ {86.0f, 97.0f, 93.0f, 94.0f}
+ },
+ {
+ {101.0f, 97.0f, 103.0f, 94.0f},
+ {111.0f, 112.0f, 113.0f, 114.0f},
+ {121.0f, 122.0f, 128.0f, 124.0f},
+ {0.0f, 132.0f, 128.0f, 0.0f},
+ {0.0f, 0.0f, 143.0f, 0.0f}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {true, false, false, true},
+ {true, true, false, true}
+ }
+ };
+
+ boolean[] nullBlock = new boolean[] {true, false, false, false};
+ int nullBlockIndex = 0;
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertEquals(nullBlock[nullBlockIndex++], block == null);
+ if (block == null) {
+ continue;
+ }
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = i + count * 5L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 4; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(3, count);
+ assertEquals(4, nullBlockIndex);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
+ @Test
+ public void batchLinearFillTest2() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ LinearFill[] fillArray =
+ new LinearFill[] {
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
+ };
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final float[][][] value =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 0.0f},
+ {21.0f, 22.0f, 0.0f, 0.0f},
+ {0.0f, 32.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ },
+ {
+ {51.0f, 0.0f, 0.0f, 0.0f},
+ {61.0f, 62.0f, 0.0f, 0.0f},
+ {71.0f, 72.0f, 0.0f, 74.0f},
+ {0.0f, 82.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ },
+ {
+ {101.0f, 0.0f, 103.0f, 0.0f},
+ {111.0f, 112.0f, 0.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ }
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, true},
+ {false, false, true, true},
+ {true, false, true, true},
+ {true, true, true, true}
+ },
+ {
+ {false, true, true, true},
+ {false, false, true, true},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, true, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ }
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT));
+ for (int i = 0; i < 5; i++) {
+ builder.getTimeColumnBuilder().writeLong(i + index * 5L);
+ for (int j = 0; j < 4; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+ builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ int count = 0;
+ float[][][] res =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 39.0f},
+ {21.0f, 22.0f, 58.0f, 39.0f},
+ {36.0f, 32.0f, 58.0f, 39.0f},
+ {36.0f, 47.0f, 58.0f, 39.0f}
+ },
+ {
+ {51.0f, 47.0f, 58.0f, 39.0f},
+ {61.0f, 62.0f, 58.0f, 39.0f},
+ {71.0f, 72.0f, 58.0f, 74.0f},
+ {86.0f, 82.0f, 58.0f, 94.0f},
+ {86.0f, 97.0f, 58.0f, 94.0f}
+ },
+ {
+ {101.0f, 97.0f, 103.0f, 94.0f},
+ {111.0f, 112.0f, 0.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, true, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ }
+ };
+
+ boolean[] nullBlock = new boolean[] {true, true, false, false, false};
+ int nullBlockIndex = 0;
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertEquals(nullBlock[nullBlockIndex++], block == null);
+ if (block == null) {
+ continue;
+ }
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = i + count * 5L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 4; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(3, count);
+ assertEquals(5, nullBlockIndex);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+}