You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/07/06 06:30:18 UTC
[iotdb] branch master updated: [IOTDB-3722] Extend Fill function (#6594)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 744fd52bbf [IOTDB-3722] Extend Fill function (#6594)
744fd52bbf is described below
commit 744fd52bbfa7c1603d8aefd690f6adb990f98806
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Wed Jul 6 14:30:12 2022 +0800
[IOTDB-3722] Extend Fill function (#6594)
---
.../iotdb/db/it/query/IoTDBNullValueFillIT.java | 239 ++++++++-------------
.../operator/process/LinearFillOperator.java | 36 +++-
.../operator/process/fill/ILinearFill.java | 57 +++++
.../identity/IdentityFill.java} | 20 +-
.../identity/IdentityLinearFill.java} | 23 +-
.../process/fill/linear/DoubleLinearFill.java | 5 -
.../process/fill/linear/FloatLinearFill.java | 5 -
.../process/fill/linear/IntLinearFill.java | 5 -
.../operator/process/fill/linear/LinearFill.java | 81 +++----
.../process/fill/linear/LongLinearFill.java | 5 -
.../operator/process/merge/AscTimeComparator.java | 5 -
.../operator/process/merge/DescTimeComparator.java | 5 -
.../operator/process/merge/TimeComparator.java | 5 -
.../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 31 ---
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 34 +--
.../execution/operator/LinearFillOperatorTest.java | 152 ++++++++++---
16 files changed, 360 insertions(+), 348 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 46f4f9e5b7..5437854454 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -29,7 +29,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import static org.apache.iotdb.db.it.utils.TestUtils.assertTestFail;
import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
@@ -93,6 +92,20 @@ public class IoTDBNullValueFillIT {
"insert into root.sg1.d2(time, s2, s4, s6) values(9, 9, 9.0, 't9')"
};
+ private final String[] expectedHeader =
+ new String[] {
+ "Time",
+ "root.sg1.d1.s1",
+ "root.sg1.d1.s2",
+ "root.sg1.d1.s3",
+ "root.sg1.d1.s4",
+ "root.sg1.d1.s5",
+ "root.sg1.d1.s6"
+ };
+
+ private final String[] expectedAlignByDeviceHeader =
+ new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
+
@BeforeClass
public static void setUp() throws Exception {
EnvFactory.getEnv().initBeforeClass();
@@ -106,16 +119,6 @@ public class IoTDBNullValueFillIT {
@Test
public void previousFillTest() {
- String[] expectedHeader =
- new String[] {
- "Time",
- "root.sg1.d1.s1",
- "root.sg1.d1.s2",
- "root.sg1.d1.s3",
- "root.sg1.d1.s4",
- "root.sg1.d1.s5",
- "root.sg1.d1.s6"
- };
String[] retArray =
new String[] {
"1,null,1,null,1.0,null,t1,",
@@ -133,16 +136,6 @@ public class IoTDBNullValueFillIT {
@Test
public void previousDescFillTest() {
- String[] expectedHeader =
- new String[] {
- "Time",
- "root.sg1.d1.s1",
- "root.sg1.d1.s2",
- "root.sg1.d1.s3",
- "root.sg1.d1.s4",
- "root.sg1.d1.s5",
- "root.sg1.d1.s6"
- };
String[] retArray =
new String[] {
"9,9,null,9.0,null,true,null,",
@@ -162,7 +155,6 @@ public class IoTDBNullValueFillIT {
@Test
public void previousFillAlignByDeviceTest() {
- String[] expectedHeader = new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
String[] retArray =
new String[] {
"1,root.sg1.d1,null,1,null,1.0,null,t1,",
@@ -184,13 +176,12 @@ public class IoTDBNullValueFillIT {
};
resultSetEqualTest(
"select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) align by device",
- expectedHeader,
+ expectedAlignByDeviceHeader,
retArray);
}
@Test
public void previousDescFillAlignByDeviceTest() {
- String[] expectedHeader = new String[] {"Time", "Device", "s1", "s2", "s3", "s4", "s5", "s6"};
String[] retArray =
new String[] {
"9,root.sg1.d1,9,null,9.0,null,true,null,",
@@ -212,181 +203,119 @@ public class IoTDBNullValueFillIT {
};
resultSetEqualTest(
"select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by time desc align by device",
- expectedHeader,
+ expectedAlignByDeviceHeader,
retArray);
}
@Test
public void linearFillTest() {
- String[] expectedHeader =
- new String[] {
- "Time", "root.sg1.d1.s1", "root.sg1.d1.s2", "root.sg1.d1.s3", "root.sg1.d1.s4"
- };
String[] retArray =
new String[] {
- "1,null,1,null,1.0,",
- "2,2,2,2.0,2.0,",
- "3,3,3,3.0,3.0,",
- "4,4,4,4.0,4.0,",
- "5,5,5,5.0,5.0,",
- "6,6,6,6.5,6.0,",
- "8,8,8,8.0,8.0,",
- "9,9,null,9.0,null,"
+ "1,null,1,null,1.0,null,t1,",
+ "2,2,2,2.0,2.0,true,t2,",
+ "3,3,3,3.0,3.0,false,null,",
+ "4,4,4,4.0,4.0,null,t4,",
+ "5,5,5,5.0,5.0,false,t5,",
+ "6,6,6,6.5,6.0,null,t6,",
+ "8,8,8,8.0,8.0,true,t8,",
+ "9,9,null,9.0,null,true,null,"
};
resultSetEqualWithDescOrderTest(
- "select s1, s2, s3, s4 from root.sg1.d1 fill(linear)", expectedHeader, retArray);
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(linear)", expectedHeader, retArray);
}
@Test
public void linearFillAlignByDeviceTest() {
- assertTestFail(
- "select s1, s2, s3, s4 from root.sg1.d1 fill(linear) align by device",
- "Linear fill is not supported in align by device query yet.");
- }
-
- @Test
- public void linearFillDataTypeMisMatchTest() {
- assertTestFail(
- "select s1, s5 from root.sg1.d1 fill(linear)",
- "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support linear fill.");
- assertTestFail(
- "select s1, s6 from root.sg1.d1 fill(linear)",
- "Data type mismatch: column 'root.sg1.d1.s6' (dataType 'TEXT') doesn't support linear fill.");
+ String[] retArray =
+ new String[] {
+ "1,root.sg1.d1,null,1,null,1.0,null,t1,",
+ "2,root.sg1.d1,2,2,2.0,2.0,true,t2,",
+ "3,root.sg1.d1,3,3,3.0,3.0,false,null,",
+ "4,root.sg1.d1,4,4,4.0,4.0,null,t4,",
+ "5,root.sg1.d1,5,5,5.0,5.0,false,t5,",
+ "6,root.sg1.d1,6,6,6.5,6.0,null,t6,",
+ "8,root.sg1.d1,8,8,8.0,8.0,true,t8,",
+ "9,root.sg1.d1,9,5,9.0,5.0,true,null,",
+ "1,root.sg1.d2,1,5,1.0,5.0,true,null,",
+ "2,root.sg1.d2,2,2,2.0,2.0,true,t2,",
+ "3,root.sg1.d2,3,3,3.0,3.0,null,t3,",
+ "4,root.sg1.d2,4,4,4.0,4.0,false,null,",
+ "5,root.sg1.d2,5,5,5.0,5.0,false,t5,",
+ "6,root.sg1.d2,6,6,6.0,6.5,false,null,",
+ "8,root.sg1.d2,8,8,8.0,8.0,true,t8,",
+ "9,root.sg1.d2,null,9,null,9.0,null,t9,"
+ };
+ resultSetEqualTest(
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(linear) align by device",
+ expectedAlignByDeviceHeader,
+ retArray);
}
@Test
public void intFillTest() {
- String[] expectedHeader =
- new String[] {
- "Time",
- "root.sg1.d1.s1",
- "root.sg1.d1.s2",
- "root.sg1.d1.s3",
- "root.sg1.d1.s4",
- "root.sg1.d1.s6"
- };
String[] retArray =
new String[] {
- "1,1000,1,1000.0,1.0,t1,",
- "2,2,2,2.0,2.0,t2,",
- "3,3,1000,3.0,1000.0,1000,",
- "4,1000,4,1000.0,4.0,t4,",
- "5,5,5,5.0,5.0,t5,",
- "6,1000,6,1000.0,6.0,t6,",
- "8,8,8,8.0,8.0,t8,",
- "9,9,1000,9.0,1000.0,1000,"
+ "1,1000,1,1000.0,1.0,null,t1,",
+ "2,2,2,2.0,2.0,true,t2,",
+ "3,3,1000,3.0,1000.0,false,1000,",
+ "4,1000,4,1000.0,4.0,null,t4,",
+ "5,5,5,5.0,5.0,false,t5,",
+ "6,1000,6,1000.0,6.0,null,t6,",
+ "8,8,8,8.0,8.0,true,t8,",
+ "9,9,1000,9.0,1000.0,true,1000,"
};
resultSetEqualWithDescOrderTest(
- "select s1, s2, s3, s4, s6 from root.sg1.d1 fill(1000)", expectedHeader, retArray);
- }
-
- @Test
- public void intFillDataTypeMisMatchTest() {
- assertTestFail(
- "select s1, s5 from root.sg1.d1 fill(1000)",
- "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with '1000' (dataType 'INT64').");
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(1000)", expectedHeader, retArray);
}
@Test
public void floatFillTest() {
- String[] expectedHeader =
- new String[] {"Time", "root.sg1.d1.s3", "root.sg1.d1.s4", "root.sg1.d1.s6"};
String[] retArray =
new String[] {
- "1,3.14,1.0,t1,",
- "2,2.0,2.0,t2,",
- "3,3.0,3.14,3.14,",
- "4,3.14,4.0,t4,",
- "5,5.0,5.0,t5,",
- "6,3.14,6.0,t6,",
- "8,8.0,8.0,t8,",
- "9,9.0,3.14,3.14,"
+ "1,null,1,3.14,1.0,null,t1,",
+ "2,2,2,2.0,2.0,true,t2,",
+ "3,3,null,3.0,3.14,false,3.14,",
+ "4,null,4,3.14,4.0,null,t4,",
+ "5,5,5,5.0,5.0,false,t5,",
+ "6,null,6,3.14,6.0,null,t6,",
+ "8,8,8,8.0,8.0,true,t8,",
+ "9,9,null,9.0,3.14,true,3.14,"
};
resultSetEqualWithDescOrderTest(
- "select s3, s4, s6 from root.sg1.d1 fill(3.14)", expectedHeader, retArray);
- }
-
- @Test
- public void floatFillDataTypeMisMatchTest() {
- assertTestFail(
- "select s1, s3 from root.sg1.d1 fill(3.14)",
- "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with '3.14' (dataType 'DOUBLE').");
- assertTestFail(
- "select s2, s3 from root.sg1.d1 fill(3.14)",
- "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with '3.14' (dataType 'DOUBLE').");
- assertTestFail(
- "select s5, s3 from root.sg1.d1 fill(3.14)",
- "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with '3.14' (dataType 'DOUBLE').");
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(3.14)", expectedHeader, retArray);
}
@Test
public void booleanFillTest() {
- String[] expectedHeader = new String[] {"Time", "root.sg1.d1.s5", "root.sg1.d1.s6"};
String[] retArray =
new String[] {
- "1,true,t1,",
- "2,true,t2,",
- "3,false,true,",
- "4,true,t4,",
- "5,false,t5,",
- "6,true,t6,",
- "8,true,t8,",
- "9,true,true,"
+ "1,null,1,null,1.0,true,t1,",
+ "2,2,2,2.0,2.0,true,t2,",
+ "3,3,null,3.0,null,false,true,",
+ "4,null,4,null,4.0,true,t4,",
+ "5,5,5,5.0,5.0,false,t5,",
+ "6,null,6,null,6.0,true,t6,",
+ "8,8,8,8.0,8.0,true,t8,",
+ "9,9,null,9.0,null,true,true,"
};
resultSetEqualWithDescOrderTest(
- "select s5, s6 from root.sg1.d1 fill(true)", expectedHeader, retArray);
- }
-
- @Test
- public void booleanFillDataTypeMisMatchTest() {
- assertTestFail(
- "select s5, s1 from root.sg1.d1 fill(true)",
- "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with 'true' (dataType 'BOOLEAN').");
- assertTestFail(
- "select s5, s2 from root.sg1.d1 fill(true)",
- "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with 'true' (dataType 'BOOLEAN').");
- assertTestFail(
- "select s5, s3 from root.sg1.d1 fill(true)",
- "Data type mismatch: column 'root.sg1.d1.s3' (dataType 'FLOAT') doesn't support fill with 'true' (dataType 'BOOLEAN').");
- assertTestFail(
- "select s5, s4 from root.sg1.d1 fill(true)",
- "Data type mismatch: column 'root.sg1.d1.s4' (dataType 'DOUBLE') doesn't support fill with 'true' (dataType 'BOOLEAN').");
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill(true)", expectedHeader, retArray);
}
@Test
public void textFillTest() {
- String[] expectedHeader = new String[] {"Time", "root.sg1.d1.s6", "root.sg1.d2.s6"};
String[] retArray =
new String[] {
- "1,t1,t0,",
- "2,t2,t2,",
- "3,t0,t3,",
- "4,t4,t0,",
- "5,t5,t5,",
- "6,t6,t0,",
- "8,t8,t8,",
- "9,t0,t9,"
+ "1,null,1,null,1.0,null,t1,",
+ "2,2,2,2.0,2.0,true,t2,",
+ "3,3,null,3.0,null,false,t0,",
+ "4,null,4,null,4.0,null,t4,",
+ "5,5,5,5.0,5.0,false,t5,",
+ "6,null,6,null,6.0,null,t6,",
+ "8,8,8,8.0,8.0,true,t8,",
+ "9,9,null,9.0,null,true,t0,"
};
resultSetEqualWithDescOrderTest(
- "select s6 from root.sg1.d1, root.sg1.d2 fill('t0')", expectedHeader, retArray);
- }
-
- @Test
- public void textFillDataTypeMisMatchTest() {
- assertTestFail(
- "select s6, s1 from root.sg1.d1 fill('t0')",
- "Data type mismatch: column 'root.sg1.d1.s1' (dataType 'INT32') doesn't support fill with 't0' (dataType 'TEXT').");
- assertTestFail(
- "select s6, s2 from root.sg1.d1 fill('t0')",
- "Data type mismatch: column 'root.sg1.d1.s2' (dataType 'INT64') doesn't support fill with 't0' (dataType 'TEXT').");
- assertTestFail(
- "select s6, s3 from root.sg1.d1 fill('t0')",
- "Data type mismatch: column 'root.sg1.d1.s3' (dataType 'FLOAT') doesn't support fill with 't0' (dataType 'TEXT').");
- assertTestFail(
- "select s6, s4 from root.sg1.d1 fill('t0')",
- "Data type mismatch: column 'root.sg1.d1.s4' (dataType 'DOUBLE') doesn't support fill with 't0' (dataType 'TEXT').");
- assertTestFail(
- "select s6, s5 from root.sg1.d1 fill('t0')",
- "Data type mismatch: column 'root.sg1.d1.s5' (dataType 'BOOLEAN') doesn't support fill with 't0' (dataType 'TEXT').");
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.d1 fill('t0')", expectedHeader, retArray);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 8193e20684..fcae23ce7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -37,11 +37,15 @@ import static java.util.Objects.requireNonNull;
public class LinearFillOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final LinearFill[] fillArray;
+ private final ILinearFill[] fillArray;
private final Operator child;
private final int outputColumnCount;
// TODO need to spill it to disk if it consumes too much memory
private final List<TsBlock> cachedTsBlock;
+
+ private final List<Long> cachedRowIndex;
+
+ private long currentRowIndex = 0;
// next TsBlock Index for each Column
private final int[] nextTsBlockIndex;
@@ -52,7 +56,7 @@ public class LinearFillOperator implements ProcessOperator {
private boolean noMoreTsBlock;
public LinearFillOperator(
- OperatorContext operatorContext, LinearFill[] fillArray, Operator child) {
+ OperatorContext operatorContext, ILinearFill[] fillArray, Operator child) {
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
checkArgument(
fillArray != null && fillArray.length > 0, "fillArray should not be null or empty");
@@ -60,6 +64,7 @@ public class LinearFillOperator implements ProcessOperator {
this.child = requireNonNull(child, "child operator is null");
this.outputColumnCount = fillArray.length;
this.cachedTsBlock = new ArrayList<>();
+ this.cachedRowIndex = new ArrayList<>();
this.nextTsBlockIndex = new int[outputColumnCount];
Arrays.fill(this.nextTsBlockIndex, 1);
this.canCallNext = false;
@@ -88,19 +93,21 @@ public class LinearFillOperator implements ProcessOperator {
return nextTsBlock;
} else { // otherwise, we cache it
cachedTsBlock.add(nextTsBlock);
+ cachedRowIndex.add(currentRowIndex);
+ currentRowIndex += nextTsBlock.getPositionCount();
}
}
TsBlock originTsBlock = cachedTsBlock.get(0);
- long currentEndTime = originTsBlock.getEndTime();
+ long currentEndRowIndex = cachedRowIndex.get(0) + originTsBlock.getPositionCount() - 1;
// Step 1: judge whether we can fill current TsBlock, if TsBlock that we can get is not enough,
// we just return null
for (int columnIndex = 0; columnIndex < outputColumnCount; columnIndex++) {
// current valueColumn can't be filled using current information
if (fillArray[columnIndex].needPrepareForNext(
- currentEndTime, originTsBlock.getColumn(columnIndex))) {
+ currentEndRowIndex, originTsBlock.getColumn(columnIndex))) {
// current cached TsBlock is not enough to fill this column
- while (!isCachedTsBlockEnough(columnIndex, currentEndTime)) {
+ while (!isCachedTsBlockEnough(columnIndex, currentEndRowIndex)) {
// if we failed to get next TsBlock
if (!tryToGetNextTsBlock()) {
// there is no more TsBlock, so we have to fill this Column
@@ -118,9 +125,12 @@ public class LinearFillOperator implements ProcessOperator {
}
// Step 2: fill current TsBlock
originTsBlock = cachedTsBlock.remove(0);
+ long startRowIndex = cachedRowIndex.remove(0);
Column[] columns = new Column[outputColumnCount];
for (int i = 0; i < outputColumnCount; i++) {
- columns[i] = fillArray[i].fill(originTsBlock.getTimeColumn(), originTsBlock.getColumn(i));
+ columns[i] =
+ fillArray[i].fill(
+ originTsBlock.getTimeColumn(), originTsBlock.getColumn(i), startRowIndex);
}
TsBlock result =
new TsBlock(originTsBlock.getPositionCount(), originTsBlock.getTimeColumn(), columns);
@@ -154,17 +164,21 @@ public class LinearFillOperator implements ProcessOperator {
* Judge whether we can use current cached TsBlock to fill Column
*
* @param columnIndex index for column which need to be filled
- * @param currentEndTime endTime of column which need to be filled
+ * @param currentEndRowIndex row index for endTime of column which need to be filled
* @return true if current cached TsBlock is enough to fill Column at columnIndex, otherwise
* false.
*/
- private boolean isCachedTsBlockEnough(int columnIndex, long currentEndTime) {
+ private boolean isCachedTsBlockEnough(int columnIndex, long currentEndRowIndex) {
// next TsBlock has already been in the cachedTsBlock
while (nextTsBlockIndex[columnIndex] < cachedTsBlock.size()) {
TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[columnIndex]);
+ long startRowIndex = cachedRowIndex.get(nextTsBlockIndex[columnIndex]);
nextTsBlockIndex[columnIndex]++;
if (fillArray[columnIndex].prepareForNext(
- currentEndTime, nextTsBlock.getTimeColumn(), nextTsBlock.getColumn(columnIndex))) {
+ startRowIndex,
+ currentEndRowIndex,
+ nextTsBlock.getTimeColumn(),
+ nextTsBlock.getColumn(columnIndex))) {
return true;
}
}
@@ -184,6 +198,8 @@ public class LinearFillOperator implements ProcessOperator {
return false;
} else { // otherwise, we cache it
cachedTsBlock.add(nextTsBlock);
+ cachedRowIndex.add(currentRowIndex);
+ currentRowIndex += nextTsBlock.getPositionCount();
return true;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
new file mode 100644
index 0000000000..d0edfc5961
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public interface ILinearFill {
+
+ /**
+ * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
+ * has been set to true
+ *
+ * @param timeColumn TimeColumn of valueColumn
+ * @param valueColumn valueColumn that need to be filled
+ * @param currentRowIndex current row index for start time in timeColumn
+ * @return Value Column that has been filled
+ */
+ Column fill(TimeColumn timeColumn, Column valueColumn, long currentRowIndex);
+
+ /**
+ * @param rowIndex row index for end time of current valueColumn that need to be filled
+ * @param valueColumn valueColumn that need to be filled
+ * @return true if valueColumn can't be filled using current information, and we need to get next
+ * TsBlock and then call prepareForNext. false if valueColumn can be filled using current
+ * information, and we can directly call fill() function
+ */
+ boolean needPrepareForNext(long rowIndex, Column valueColumn);
+
+ /**
+ * @param startRowIndex row index for start time of nextValueColumn
+ * @param endRowIndex row index for end time of current valueColumn that need to be filled
+ * @param nextTimeColumn TimeColumn of next TsBlock
+ * @param nextValueColumn Value Column of next TsBlock
+ * @return true if we get enough information to fill current column, and we can stop getting next
+ * TsBlock and calling prepareForNext. false if we still don't get enough information to fill
+ * current column, and still need to keep getting next TsBlock and then call prepareForNext
+ */
+ boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
index 4aa723ad97..25a5264bb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
@@ -16,23 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
-public class AscTimeComparator implements TimeComparator {
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
- /** @return if order by time asc, return true if time <= endTime, otherwise false */
- @Override
- public boolean satisfyCurEndTime(long time, long endTime) {
- return time <= endTime;
- }
-
- @Override
- public long getCurrentEndTime(long time1, long time2) {
- return Math.min(time1, time2);
- }
+public class IdentityFill implements IFill {
@Override
- public boolean inFillBound(long time, long timeBound) {
- return time < timeBound;
+ public Column fill(Column valueColumn) {
+ return valueColumn;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
index 4aa723ad97..34f329baf0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
@@ -16,23 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
-public class AscTimeComparator implements TimeComparator {
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class IdentityLinearFill implements ILinearFill {
- /** @return if order by time asc, return true if time <= endTime, otherwise false */
@Override
- public boolean satisfyCurEndTime(long time, long endTime) {
- return time <= endTime;
+ public Column fill(TimeColumn timeColumn, Column valueColumn, long currentRowIndex) {
+ return valueColumn;
}
@Override
- public long getCurrentEndTime(long time1, long time2) {
- return Math.min(time1, time2);
+ public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
+ return false;
}
@Override
- public boolean inFillBound(long time, long timeBound) {
- return time < timeBound;
+ public boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn) {
+ throw new IllegalArgumentException(
+ "We won't call prepareForNext in IdentityLinearFill, because needPrepareForNext() method will always return false.");
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
index ca8f558647..52a228b1e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -34,10 +33,6 @@ public class DoubleLinearFill extends LinearFill {
private double nextValueInCurrentColumn;
- public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((double[]) array)[index] = column.getDouble(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
index 803c55ec8f..a32c60d61e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -34,10 +33,6 @@ public class FloatLinearFill extends LinearFill {
private float nextValueInCurrentColumn;
- public FloatLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((float[]) array)[index] = column.getFloat(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
index 4876ed23b8..baf4806551 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -34,10 +33,6 @@ public class IntLinearFill extends LinearFill {
private int nextValueInCurrentColumn;
- public IntLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((int[]) array)[index] = column.getInt(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index 92685b9c38..238fb91137 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -31,32 +31,18 @@ import static com.google.common.base.Preconditions.checkArgument;
* the closest timestamp after T. Linear Fill function calculation only supports numeric types
* including long, int, double and float.
*/
-public abstract class LinearFill {
-
- private final TimeComparator timeComparator;
+public abstract class LinearFill implements ILinearFill {
// whether previous value is null
protected boolean previousIsNull = true;
- // time of next value
- protected long nextTime;
-
- protected long nextTimeInCurrentColumn;
- public LinearFill(boolean ascending, TimeComparator timeComparator) {
- this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
- this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
- this.timeComparator = timeComparator;
- }
+ // next row index which is not null
+ private long nextRowIndex = -1;
+ // next row index in current column which is not null
+ private long nextRowIndexInCurrentColumn = -1;
- /**
- * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext
- * has been set to true
- *
- * @param timeColumn TimeColumn of valueColumn
- * @param valueColumn valueColumn that need to be filled
- * @return Value Column that has been filled
- */
- public Column fill(TimeColumn timeColumn, Column valueColumn) {
+ @Override
+ public Column fill(TimeColumn timeColumn, Column valueColumn, long startRowIndex) {
int size = valueColumn.getPositionCount();
// if this valueColumn is empty, just return itself;
if (size == 0) {
@@ -74,11 +60,13 @@ public abstract class LinearFill {
// if its values are all null
if (valueColumn instanceof RunLengthEncodedColumn) {
// previous value is null or next value is null, we just return NULL_VALUE_BLOCK
- if (previousIsNull || timeComparator.inFillBound(nextTime, timeColumn.getStartTime())) {
+ if (previousIsNull || nextRowIndex < startRowIndex) {
return new RunLengthEncodedColumn(createNullValueColumn(), size);
} else {
prepareForNextValueInCurrentColumn(
- timeColumn.getEndTime(), timeColumn.getPositionCount() - 1, timeColumn, valueColumn);
+ startRowIndex + timeColumn.getPositionCount() - 1,
+ timeColumn.getPositionCount(),
+ valueColumn);
return new RunLengthEncodedColumn(createFilledValueColumn(), size);
}
} else {
@@ -90,10 +78,10 @@ public abstract class LinearFill {
for (int i = 0; i < size; i++) {
// current value is null, we need to fill it
if (valueColumn.isNull(i)) {
- long currentTime = timeColumn.getLong(i);
- prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn, valueColumn);
+ long currentRowIndex = startRowIndex + i;
+ prepareForNextValueInCurrentColumn(currentRowIndex, i + 1, valueColumn);
// we don't fill it, if either previous value or next value is null
- if (previousIsNull || nextIsNull(currentTime)) {
+ if (previousIsNull || nextIsNull(currentRowIndex)) {
isNull[i] = true;
hasNullValue = true;
} else {
@@ -113,63 +101,56 @@ public abstract class LinearFill {
}
/**
- * @param time end time of current valueColumn that need to be filled
+ * @param rowIndex end time of current valueColumn that need to be filled
* @param valueColumn valueColumn that need to be filled
* @return true if valueColumn can't be filled using current information, and we need to get next
* TsBlock and then call prepareForNext. false if valueColumn can be filled using current
* information, and we can directly call fill() function
*/
- public boolean needPrepareForNext(long time, Column valueColumn) {
- return timeComparator.inFillBound(nextTime, time)
- && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+ @Override
+ public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
+ return nextRowIndex < rowIndex && valueColumn.isNull(valueColumn.getPositionCount() - 1);
}
- /**
- * @param time end time of current valueColumn that need to be filled
- * @param nextTimeColumn TimeColumn of next TsBlock
- * @param nextValueColumn Value Column of next TsBlock
- * @return true if we get enough information to fill current column, and we can stop getting next
- * TsBlock and calling prepareForNext. false if we still don't get enough information to fill
- * current column, and still need to keep getting next TsBlock and then call prepareForNext
- */
- public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) {
+ @Override
+ public boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn) {
checkArgument(
- nextTimeColumn.getPositionCount() > 0
- && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)),
+ nextTimeColumn.getPositionCount() > 0 && endRowIndex < startRowIndex,
"nextColumn's time should be greater than current time");
- if (timeComparator.satisfyCurEndTime(time, nextTime)) {
+ if (endRowIndex <= nextRowIndex) {
return true;
}
for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
if (!nextValueColumn.isNull(i)) {
updateNextValue(nextValueColumn, i);
- this.nextTime = nextTimeColumn.getLong(i);
+ this.nextRowIndex = startRowIndex + i;
return true;
}
}
return false;
}
- private boolean nextIsNull(long time) {
- return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time);
+ private boolean nextIsNull(long rowIndex) {
+ return nextRowIndexInCurrentColumn <= rowIndex;
}
private void prepareForNextValueInCurrentColumn(
- long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
- if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) {
+ long currentRowIndex, int startIndex, Column valueColumn) {
+ if (currentRowIndex <= nextRowIndexInCurrentColumn) {
return;
}
for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
- this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+ this.nextRowIndexInCurrentColumn = currentRowIndex + (i - startIndex + 1);
updateNextValueInCurrentColumn(valueColumn, i);
return;
}
}
// current column's value is not enough for filling, we should use value of next Column
- this.nextTimeInCurrentColumn = this.nextTime;
+ this.nextRowIndexInCurrentColumn = this.nextRowIndex;
updateNextValueInCurrentColumn();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
index 5f6986ad2e..04dba1613a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -34,10 +33,6 @@ public class LongLinearFill extends LinearFill {
private long nextValueInCurrentColumn;
- public LongLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((long[]) array)[index] = column.getLong(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 4aa723ad97..95b7316844 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -30,9 +30,4 @@ public class AscTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.min(time1, time2);
}
-
- @Override
- public boolean inFillBound(long time, long timeBound) {
- return time < timeBound;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index 495cd2852a..f53c97d8fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -30,9 +30,4 @@ public class DescTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.max(time1, time2);
}
-
- @Override
- public boolean inFillBound(long time, long timeBound) {
- return time > timeBound;
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index a2f2c076da..db017f6186 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -25,9 +25,4 @@ public interface TimeComparator {
/** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */
long getCurrentEndTime(long time1, long time2);
-
- /**
- * @return true if time < timeBound && order by time asc, time > timeBound && order by time desc
- */
- boolean inFillBound(long time, long timeBound);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 2af7e16daa..8a9a063865 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -412,36 +411,6 @@ public class Analyzer {
FillComponent fillComponent = queryStatement.getFillComponent();
List<Expression> fillColumnList =
outputExpressions.stream().map(Pair::getLeft).distinct().collect(Collectors.toList());
- if (fillComponent.getFillPolicy() == FillPolicy.VALUE) {
- for (Expression fillColumn : fillColumnList) {
- TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString());
- if (!fillComponent.getFillValue().isDataTypeConsistency(checkedDataType)) {
- throw new SemanticException(
- String.format(
- "Data type mismatch: column '%s' (dataType '%s') doesn't support fill with '%s' (dataType '%s').",
- fillColumn.getExpressionString(),
- checkedDataType,
- fillComponent.getFillValue().getBinary(),
- fillComponent.getFillValue().getDataTypeString()));
- }
- }
- } else if (fillComponent.getFillPolicy() == FillPolicy.LINEAR) {
- // TODO support linear fill in align by device query
- if (queryStatement.isAlignByDevice()) {
- throw new SemanticException(
- "Linear fill is not supported in align by device query yet.");
- }
-
- for (Expression fillColumn : fillColumnList) {
- TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString());
- if (!checkedDataType.isNumeric()) {
- throw new SemanticException(
- String.format(
- "Data type mismatch: column '%s' (dataType '%s') doesn't support linear fill.",
- fillColumn.getExpressionString(), checkedDataType));
- }
- }
- }
analysis.setFillDescriptor(
new FillDescriptor(fillComponent.getFillPolicy(), fillComponent.getFillValue()));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index ce8afcf154..343c328eea 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -59,16 +59,18 @@ import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill;
-import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill;
@@ -180,6 +182,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
/**
* Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
@@ -197,6 +200,10 @@ public class LocalExecutionPlanner {
private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
+ private static final IdentityFill IDENTITY_FILL = new IdentityFill();
+
+ private static final IdentityLinearFill IDENTITY_LINEAR_FILL = new IdentityLinearFill();
+
public static LocalExecutionPlanner getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -664,8 +671,7 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
LinearFillOperator.class.getSimpleName()),
- getLinearFill(
- inputColumns, inputDataTypes, node.getScanOrder() == OrderBy.TIMESTAMP_ASC),
+ getLinearFill(inputColumns, inputDataTypes),
child);
default:
throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy);
@@ -676,6 +682,10 @@ public class LocalExecutionPlanner {
int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
IFill[] constantFill = new IFill[inputColumns];
for (int i = 0; i < inputColumns; i++) {
+ if (!literal.isDataTypeConsistency(inputDataTypes.get(i))) {
+ constantFill[i] = IDENTITY_FILL;
+ continue;
+ }
switch (inputDataTypes.get(i)) {
case BOOLEAN:
constantFill[i] = new BooleanConstantFill(literal.getBoolean());
@@ -731,28 +741,26 @@ public class LocalExecutionPlanner {
return previousFill;
}
- private LinearFill[] getLinearFill(
- int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
- LinearFill[] linearFill = new LinearFill[inputColumns];
- TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR;
+ private ILinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) {
+ ILinearFill[] linearFill = new ILinearFill[inputColumns];
for (int i = 0; i < inputColumns; i++) {
switch (inputDataTypes.get(i)) {
case INT32:
- linearFill[i] = new IntLinearFill(ascending, timeComparator);
+ linearFill[i] = new IntLinearFill();
break;
case INT64:
- linearFill[i] = new LongLinearFill(ascending, timeComparator);
+ linearFill[i] = new LongLinearFill();
break;
case FLOAT:
- linearFill[i] = new FloatLinearFill(ascending, timeComparator);
+ linearFill[i] = new FloatLinearFill();
break;
case DOUBLE:
- linearFill[i] = new DoubleLinearFill(ascending, timeComparator);
+ linearFill[i] = new DoubleLinearFill();
break;
case BOOLEAN:
case TEXT:
- throw new UnsupportedOperationException(
- "DataType: " + inputDataTypes.get(i) + " doesn't support linear fill.");
+ linearFill[i] = IDENTITY_LINEAR_FILL;
+ break;
default:
throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i));
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 9852707056..56c2fe6a1a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -25,11 +25,10 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -42,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class LinearFillOperatorTest {
@@ -62,14 +62,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = true;
- TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -262,14 +260,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = false;
- TimeComparator timeComparator = new DescTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -462,14 +458,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = true;
- TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -662,14 +656,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = false;
- TimeComparator timeComparator = new DescTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -862,8 +854,7 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- LinearFill[] fillArray =
- new LinearFill[] {new FloatLinearFill(true, new AscTimeComparator())};
+ LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
LinearFillOperator fillOperator =
new LinearFillOperator(
fragmentInstanceContext.getOperatorContexts().get(0),
@@ -968,8 +959,7 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- LinearFill[] fillArray =
- new LinearFill[] {new FloatLinearFill(false, new DescTimeComparator())};
+ LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
LinearFillOperator fillOperator =
new LinearFillOperator(
fragmentInstanceContext.getOperatorContexts().get(0),
@@ -1057,4 +1047,104 @@ public class LinearFillOperatorTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ @Test
+ public void batchLinearFillBooleanTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ ILinearFill[] fillArray = new ILinearFill[] {new IdentityLinearFill()};
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final boolean[][][] value =
+ new boolean[][][] {
+ {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}}
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}}
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN));
+ for (int i = 0; i < 1; i++) {
+ builder.getTimeColumnBuilder().writeLong(i + index);
+ for (int j = 0; j < 1; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+ builder.getColumnBuilder(j).writeBoolean(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 7;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 7;
+ }
+ });
+
+ int count = 0;
+ boolean[][][] res =
+ new boolean[][][] {
+ {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}}
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}}
+ };
+
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertNotNull(block);
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = i + count;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 1; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getBoolean(i));
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(res.length, count);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}