You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/25 11:20:31 UTC
[iotdb] 01/01: Implement MultiColumnMerger to support querying data of one timesereis distributed on different DataNodes
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c1f76ec2bfa611afa71d3c934c608dec0cb44c30
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 25 19:20:14 2022 +0800
Implement MultiColumnMerger to support querying data of one timesereis distributed on different DataNodes
---
.../mpp/operator/process/merge/ColumnMerger.java | 4 +-
.../operator/process/merge/MultiColumnMerger.java | 96 +++++++++++
.../operator/process/merge/SingleColumnMerger.java | 66 +++++++-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 2 +-
.../iotdb/db/mpp/execution/DataDriverTest.java | 4 +-
.../iotdb/db/mpp/operator/LimitOperatorTest.java | 4 +-
.../db/mpp/operator/MultiColumnMergerTest.java | 185 +++++++++++++++++++++
.../db/mpp/operator/SingleColumnMergerTest.java | 96 ++++++++++-
.../db/mpp/operator/TimeJoinOperatorTest.java | 10 +-
9 files changed, 446 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
index 04ed2bb09e..0f325ac3bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
@@ -41,7 +41,9 @@ public interface ColumnMerger {
* merge columns belonging to same series into one column
*
* @param inputTsBlocks all source TsBlocks, some of which will cantain source column
- * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks
+ * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
+ * we should only read from this array and not update it because others will use the start
+ * index value in inputIndex array
* @param updatedInputIndex current index for each source TsBlock after merging
* @param timeBuilder result time column, which is already generated and used to indicate each
* row's timestamp
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java
new file mode 100644
index 0000000000..2ff29d06be
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.process.merge;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+/** has more than one input column, but these columns' time is not overlapped */
+public class MultiColumnMerger implements ColumnMerger {
+
+ private final List<InputLocation> inputLocations;
+
+ public MultiColumnMerger(List<InputLocation> inputLocations) {
+ this.inputLocations = inputLocations;
+ }
+
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ TimeColumnBuilder timeBuilder,
+ long currentEndTime,
+ ColumnBuilder columnBuilder) {
+ int rowCount = timeBuilder.getPositionCount();
+
+ // init startIndex for each input locations
+ for (InputLocation inputLocation : inputLocations) {
+ int tsBlockIndex = inputLocation.getTsBlockIndex();
+ updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
+ }
+
+ for (int i = 0; i < rowCount; i++) {
+ // record whether current row already has value to be appended
+ boolean appendValue = false;
+ // we don't use MinHeap here to choose the right column, because inputLocations.size() won't
+ // be very large.
+ // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
+ // than PriorityQueue.
+ for (InputLocation location : inputLocations) {
+ int tsBlockIndex = location.getTsBlockIndex();
+ int columnIndex = location.getValueColumnIndex();
+ int index = updatedInputIndex[tsBlockIndex];
+
+ // current location's input column is not empty
+ if (!empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+ TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
+ Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+ // time of current location's input column is equal to current row's time and value of
+ // current location's input column is not null
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ if (valueColumn.isNull(index)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(valueColumn, index);
+ }
+ // increase the index
+ index++;
+ // update the index after merging
+ updatedInputIndex[tsBlockIndex] = index;
+ // we can safely set appendValue to true and then break the loop, because these input
+ // columns' time is not overlapped
+ appendValue = true;
+ break;
+ }
+ }
+ }
+ // all input columns are null at current row, so just append a null
+ if (!appendValue) {
+ columnBuilder.appendNull();
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
index 8e3c82cb9a..83a2e69733 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.operator.process.merge;
import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -28,12 +29,24 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
/** only has one input column */
public class SingleColumnMerger implements ColumnMerger {
+ private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
+
+ private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
+
private final InputLocation location;
- public SingleColumnMerger(InputLocation location) {
+ private final TimeComparator comparator;
+
+ public SingleColumnMerger(InputLocation location, OrderBy orderBy) {
this.location = location;
+ if (orderBy == OrderBy.TIMESTAMP_ASC) {
+ comparator = ASC_TIME_COMPARATOR;
+ } else {
+ comparator = DESC_TIME_COMPARATOR;
+ }
}
+ @Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
@@ -49,19 +62,32 @@ public class SingleColumnMerger implements ColumnMerger {
// input column is empty or current time of input column is already larger than currentEndTime
// just appendNull rowCount null
if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
- || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) > currentEndTime) {
+ || !comparator.satisfy(inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
columnBuilder.appendNull(rowCount);
} else {
// read from input column and write it into columnBuilder
TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
for (int i = 0; i < rowCount; i++) {
- // current index is less than size of input column and current time of input column is equal
- // to result row's time and input column's value at index is not null
- if (timeColumn.getPositionCount() > index
- && timeColumn.getLong(index) == timeBuilder.getTime(i)
- && !valueColumn.isNull(index)) {
- columnBuilder.write(valueColumn, index++);
+ // current index reaches the size of input column or current time of input column is already
+ // larger than currentEndTime, use null column to fill the remaining
+ if (timeColumn.getPositionCount() == index
+ || !comparator.satisfy(
+ inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
+ columnBuilder.appendNull(rowCount - i);
+ break;
+ }
+ // current time of input column is equal to result row's time
+ if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+ // if input column's value at index is null, append a null value
+ if (valueColumn.isNull(index)) {
+ columnBuilder.appendNull();
+ } else {
+ // if input column's value at index is not null, append the value
+ columnBuilder.write(valueColumn, index);
+ }
+ // increase the index
+ index++;
} else {
// otherwise, append a null
columnBuilder.appendNull();
@@ -71,4 +97,28 @@ public class SingleColumnMerger implements ColumnMerger {
updatedInputIndex[tsBlockIndex] = index;
}
}
+
+ private interface TimeComparator {
+
+ /** @return true if time is satisfied with endTime, otherwise false */
+ boolean satisfy(long time, long endTime);
+ }
+
+ private static class AscTimeComparator implements TimeComparator {
+
+ /** @return if order by time asc, return true if time <= endTime, otherwise false */
+ @Override
+ public boolean satisfy(long time, long endTime) {
+ return time <= endTime;
+ }
+ }
+
+ private static class DescTimeComparator implements TimeComparator {
+
+ /** @return if order by time desc, return true if time >= endTime, otherwise false */
+ @Override
+ public boolean satisfy(long time, long endTime) {
+ return time >= endTime;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 230ffac797..05864ace5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -323,7 +323,7 @@ public class LocalExecutionPlanner {
ColumnMerger merger;
// only has one input column
if (outputColumn.isSingleInputColumn()) {
- merger = new SingleColumnMerger(outputColumn.getInputLocation(0));
+ merger = new SingleColumnMerger(outputColumn.getInputLocation(0), OrderBy.TIMESTAMP_ASC);
} else if (!outputColumn.isOverlapped()) {
// has more than one input columns but time of these input columns is not overlapped
throw new UnsupportedOperationException(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 7596bde6d9..65cca8e51c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -145,8 +145,8 @@ public class DataDriverTest {
OrderBy.TIMESTAMP_ASC,
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0)),
- new SingleColumnMerger(new InputLocation(1, 0))));
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+ new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
LimitOperator limitOperator =
new LimitOperator(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index b8591c8fdb..6fc0e04647 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -140,8 +140,8 @@ public class LimitOperatorTest {
OrderBy.TIMESTAMP_ASC,
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0)),
- new SingleColumnMerger(new InputLocation(1, 0))));
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+ new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
LimitOperator limitOperator =
new LimitOperator(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java
new file mode 100644
index 0000000000..cfb043cc0c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.mpp.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MultiColumnMergerTest {
+
+ @Test
+ public void mergeTest1() {
+ MultiColumnMerger merger =
+ new MultiColumnMerger(ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)));
+
+ TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder1.getTimeColumnBuilder().writeLong(2);
+ inputBuilder1.getColumnBuilder(0).writeInt(20);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(4);
+ inputBuilder1.getColumnBuilder(0).writeInt(40);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(5);
+ inputBuilder1.getColumnBuilder(0).appendNull();
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(6);
+ inputBuilder1.getColumnBuilder(0).writeInt(60);
+ inputBuilder1.declarePosition();
+
+ TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder2.getTimeColumnBuilder().writeLong(8);
+ inputBuilder2.getColumnBuilder(0).writeInt(800);
+ inputBuilder2.declarePosition();
+ inputBuilder2.getTimeColumnBuilder().writeLong(10);
+ inputBuilder2.getColumnBuilder(0).writeInt(1000);
+ inputBuilder2.declarePosition();
+
+ TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+ int[] inputIndex = new int[] {1, 0};
+ int[] updatedInputIndex = new int[] {1, 0};
+
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(4);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(5);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(6);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(7);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(8);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(9);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(10);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(11);
+ builder.declarePosition();
+ ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 11, valueColumnBuilder);
+
+ assertEquals(4, updatedInputIndex[0]);
+ assertEquals(2, updatedInputIndex[1]);
+
+ Column result = valueColumnBuilder.build();
+
+ assertEquals(8, result.getPositionCount());
+ assertFalse(result.isNull(0));
+ assertEquals(40, result.getInt(0));
+ assertTrue(result.isNull(1));
+ assertFalse(result.isNull(2));
+ assertEquals(60, result.getInt(2));
+ assertTrue(result.isNull(3));
+ assertFalse(result.isNull(4));
+ assertEquals(800, result.getInt(4));
+ assertTrue(result.isNull(5));
+ assertFalse(result.isNull(6));
+ assertEquals(1000, result.getInt(6));
+ assertTrue(result.isNull(7));
+ }
+
+ @Test
+ public void mergeTest2() {
+ MultiColumnMerger merger =
+ new MultiColumnMerger(ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)));
+
+ TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder1.getTimeColumnBuilder().writeLong(2);
+ inputBuilder1.getColumnBuilder(0).writeInt(20);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(4);
+ inputBuilder1.getColumnBuilder(0).writeInt(40);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(6);
+ inputBuilder1.getColumnBuilder(0).writeInt(60);
+ inputBuilder1.declarePosition();
+
+ TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder2.getTimeColumnBuilder().writeLong(8);
+ inputBuilder2.getColumnBuilder(0).writeInt(800);
+ inputBuilder2.declarePosition();
+ inputBuilder2.getTimeColumnBuilder().writeLong(10);
+ inputBuilder2.getColumnBuilder(0).writeInt(1000);
+ inputBuilder2.declarePosition();
+
+ TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+ int[] inputIndex = new int[] {1, 0};
+ int[] updatedInputIndex = new int[] {1, 0};
+
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(4);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(5);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(6);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(7);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(8);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(9);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(10);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(11);
+ builder.declarePosition();
+ ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 11, valueColumnBuilder);
+
+ assertEquals(3, updatedInputIndex[0]);
+ assertEquals(2, updatedInputIndex[1]);
+
+ Column result = valueColumnBuilder.build();
+
+ assertEquals(8, result.getPositionCount());
+ assertFalse(result.isNull(0));
+ assertEquals(40, result.getInt(0));
+ assertTrue(result.isNull(1));
+ assertFalse(result.isNull(2));
+ assertEquals(60, result.getInt(2));
+ assertTrue(result.isNull(3));
+ assertFalse(result.isNull(4));
+ assertEquals(800, result.getInt(4));
+ assertTrue(result.isNull(5));
+ assertFalse(result.isNull(6));
+ assertEquals(1000, result.getInt(6));
+ assertTrue(result.isNull(7));
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
index 9cd3b4bd86..38a5a550f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.operator;
import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -39,7 +40,8 @@ public class SingleColumnMergerTest {
@Test
public void mergeTest1() {
- SingleColumnMerger merger = new SingleColumnMerger(new InputLocation(0, 0));
+ SingleColumnMerger merger =
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
inputBuilder.getTimeColumnBuilder().writeLong(2);
@@ -48,6 +50,9 @@ public class SingleColumnMergerTest {
inputBuilder.getTimeColumnBuilder().writeLong(4);
inputBuilder.getColumnBuilder(0).writeInt(40);
inputBuilder.declarePosition();
+ inputBuilder.getTimeColumnBuilder().writeLong(5);
+ inputBuilder.getColumnBuilder(0).appendNull();
+ inputBuilder.declarePosition();
inputBuilder.getTimeColumnBuilder().writeLong(6);
inputBuilder.getColumnBuilder(0).writeInt(60);
inputBuilder.declarePosition();
@@ -71,6 +76,8 @@ public class SingleColumnMergerTest {
merger.mergeColumn(
inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
+ assertEquals(4, updatedInputIndex[0]);
+
Column result = valueColumnBuilder.build();
assertEquals(4, result.getPositionCount());
@@ -85,7 +92,8 @@ public class SingleColumnMergerTest {
/** input tsblock is null */
@Test
public void mergeTest2() {
- SingleColumnMerger merger = new SingleColumnMerger(new InputLocation(0, 0));
+ SingleColumnMerger merger =
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
TsBlock[] inputTsBlocks = new TsBlock[1];
int[] inputIndex = new int[] {0};
@@ -106,6 +114,90 @@ public class SingleColumnMergerTest {
merger.mergeColumn(
inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
+ assertEquals(0, updatedInputIndex[0]);
+
+ Column result = valueColumnBuilder.build();
+
+ assertEquals(4, result.getPositionCount());
+ assertTrue(result.isNull(0));
+ assertTrue(result.isNull(1));
+ assertTrue(result.isNull(2));
+ assertTrue(result.isNull(3));
+ }
+
+ /** current time of input tsblock is larger than endTime of timeBuilder in asc order */
+ @Test
+ public void mergeTest3() {
+ SingleColumnMerger merger =
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
+
+ TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder.getTimeColumnBuilder().writeLong(8);
+ inputBuilder.getColumnBuilder(0).writeInt(80);
+ inputBuilder.declarePosition();
+
+ TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder.build()};
+ int[] inputIndex = new int[] {0};
+ int[] updatedInputIndex = new int[] {0};
+
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(4);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(5);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(6);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(7);
+ builder.declarePosition();
+ ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
+
+ assertEquals(0, updatedInputIndex[0]);
+
+ Column result = valueColumnBuilder.build();
+
+ assertEquals(4, result.getPositionCount());
+ assertTrue(result.isNull(0));
+ assertTrue(result.isNull(1));
+ assertTrue(result.isNull(2));
+ assertTrue(result.isNull(3));
+ }
+
+ /** current time of input tsblock is less than endTime of timeBuilder in desc order */
+ @Test
+ public void mergeTest4() {
+ SingleColumnMerger merger =
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_DESC);
+
+ TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder.getTimeColumnBuilder().writeLong(2);
+ inputBuilder.getColumnBuilder(0).writeInt(20);
+ inputBuilder.declarePosition();
+
+ TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder.build()};
+ int[] inputIndex = new int[] {0};
+ int[] updatedInputIndex = new int[] {0};
+
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(7);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(6);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(5);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(4);
+ builder.declarePosition();
+ ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 4, valueColumnBuilder);
+
+ assertEquals(0, updatedInputIndex[0]);
+
Column result = valueColumnBuilder.build();
assertEquals(4, result.getPositionCount());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 10581cc3f9..a0f4d08023 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -137,8 +137,8 @@ public class TimeJoinOperatorTest {
OrderBy.TIMESTAMP_ASC,
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0)),
- new SingleColumnMerger(new InputLocation(1, 0))));
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+ new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
int count = 0;
while (timeJoinOperator.hasNext()) {
TsBlock tsBlock = timeJoinOperator.next();
@@ -251,9 +251,9 @@ public class TimeJoinOperatorTest {
OrderBy.TIMESTAMP_ASC,
Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32),
Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0)),
- new SingleColumnMerger(new InputLocation(1, 0)),
- new SingleColumnMerger(new InputLocation(2, 0))));
+ new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+ new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC),
+ new SingleColumnMerger(new InputLocation(2, 0), OrderBy.TIMESTAMP_ASC)));
int count = 0;
while (timeJoinOperator.hasNext()) {
TsBlock tsBlock = timeJoinOperator.next();