You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/05 09:19:54 UTC
[iotdb] 01/01: [IOTDB-3103] Implementation of NonOverlappedMultiColumnMerger
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 824ef3434edc5a3c79745f8ff5191ee1f71d168e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu May 5 17:19:30 2022 +0800
[IOTDB-3103] Implementation of NonOverlappedMultiColumnMerger
---
.../operator/process/TimeJoinOperator.java | 31 ++-
.../operator/process/merge/ColumnMerger.java | 2 +-
.../operator/process/merge/MultiColumnMerger.java | 4 +-
.../merge/NonOverlappedMultiColumnMerger.java | 88 ++++++++
.../operator/process/merge/SingleColumnMerger.java | 23 ++-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 17 +-
.../plan/planner/plan/parameter/OutputColumn.java | 2 +-
.../NonOverlappedMultiColumnMergerTest.java | 230 +++++++++++++++++++++
8 files changed, 377 insertions(+), 20 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 1ca4053049..219342fe5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -128,15 +128,32 @@ public class TimeJoinOperator implements ProcessOperator {
// TsBlocks order by asc/desc
long currentEndTime = 0;
boolean init = false;
+
+ // get TsBlock for each input, put their time stamp into TimeSelector and then use the min Time
+ // among all the input TsBlock as the current output TsBlock's endTime.
for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && empty(i) && children.get(i).hasNext()) {
- inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).next();
- if (!empty(i)) {
- int rowSize = inputTsBlocks[i].getPositionCount();
- for (int row = 0; row < rowSize; row++) {
- timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+ if (!noMoreTsBlocks[i] && empty(i)) {
+ if (children.get(i).hasNext()) {
+ inputIndex[i] = 0;
+ inputTsBlocks[i] = children.get(i).next();
+ if (!empty(i)) {
+ int rowSize = inputTsBlocks[i].getPositionCount();
+ for (int row = 0; row < rowSize; row++) {
+ timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+ }
+ } else {
+ // child operator has next but return an empty TsBlock which means that it may not
+ // finish calculation in given time slice.
+ // In such case, TimeJoinOperator can't go on calculating, so we just return null.
+ // We can also use the while loop here to continuously call the hasNext() and next()
+ // methods of the child operator until its hasNext() returns false or the next() gets
+ // the data that is not empty, but this will cause the execution time of the while loop
+ // to be uncontrollable and may exceed all allocated time slice
+ return null;
}
+ } else { // no more tsBlock
+ noMoreTsBlocks[i] = true;
+ inputTsBlocks[i] = null;
}
}
// update the currentEndTime if the TsBlock is not empty
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
index 731827aea5..8df6c53c55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
@@ -32,7 +32,7 @@ public interface ColumnMerger {
* @return true if TsBlock at tsBlockIndex is null or its current read index is larger than its
* size
*/
- default boolean empty(int tsBlockIndex, TsBlock[] inputTsBlocks, int[] inputIndex) {
+ static boolean empty(int tsBlockIndex, TsBlock[] inputTsBlocks, int[] inputIndex) {
return inputTsBlocks[tsBlockIndex] == null
|| inputTsBlocks[tsBlockIndex].getPositionCount() == inputIndex[tsBlockIndex];
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
index 6a2eda841b..ef9f7ceb00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-/** has more than one input column, but these columns' time is not overlapped */
+/** has more than one input column, but these columns' time is overlapped */
public class MultiColumnMerger implements ColumnMerger {
private final List<InputLocation> inputLocations;
@@ -65,7 +65,7 @@ public class MultiColumnMerger implements ColumnMerger {
int index = updatedInputIndex[tsBlockIndex];
// current location's input column is not empty
- if (!empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+ if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
// time of current location's input column is equal to current row's time
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
new file mode 100644
index 0000000000..edf2972e95
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn;
+
+/** has more than one input column, but these columns' time is not overlapped */
+public class NonOverlappedMultiColumnMerger implements ColumnMerger {
+
+ private final List<InputLocation> inputLocations;
+
+ private final TimeComparator comparator;
+
+ // index for inputLocations indicating current iterating TsBlock's InputLocation
+ private int index;
+
+ /**
+ * these columns' time should never be overlapped
+ *
+ * @param inputLocations The time order in TsBlock represented by inputLocations should be
+ * incremented by timestamp if it is order by time asc, otherwise decreased by timestamp if it
+ * is order by time desc
+ */
+ public NonOverlappedMultiColumnMerger(
+ List<InputLocation> inputLocations, TimeComparator comparator) {
+ this.inputLocations = inputLocations;
+ this.comparator = comparator;
+ this.index = 0;
+ }
+
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ TimeColumnBuilder timeBuilder,
+ long currentEndTime,
+ ColumnBuilder columnBuilder) {
+ // move to next InputLocation if current InputLocation's column has been consumed up
+ moveToNextIfNecessary(inputTsBlocks);
+ // merge current column
+ mergeOneColumn(
+ inputTsBlocks,
+ inputIndex,
+ updatedInputIndex,
+ timeBuilder,
+ currentEndTime,
+ columnBuilder,
+ inputLocations.get(index),
+ comparator);
+ }
+
+ private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) {
+ // if it is already at the last index, don't need to move
+ if (index == inputLocations.size() - 1) {
+ return;
+ }
+ InputLocation location = inputLocations.get(index);
+ int tsBlockIndex = location.getTsBlockIndex();
+ // if inputTsBlocks[tsBlockIndex] is null, means current index's TsBlock has been consumed up
+ while (index < inputLocations.size() - 1 && inputTsBlocks[tsBlockIndex] == null) {
+ index++;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
index 92cc2f3a94..eb4743a036 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
@@ -45,6 +45,27 @@ public class SingleColumnMerger implements ColumnMerger {
TimeColumnBuilder timeBuilder,
long currentEndTime,
ColumnBuilder columnBuilder) {
+
+ mergeOneColumn(
+ inputTsBlocks,
+ inputIndex,
+ updatedInputIndex,
+ timeBuilder,
+ currentEndTime,
+ columnBuilder,
+ location,
+ comparator);
+ }
+
+ public static void mergeOneColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ TimeColumnBuilder timeBuilder,
+ long currentEndTime,
+ ColumnBuilder columnBuilder,
+ InputLocation location,
+ TimeComparator comparator) {
int tsBlockIndex = location.getTsBlockIndex();
int columnIndex = location.getValueColumnIndex();
@@ -52,7 +73,7 @@ public class SingleColumnMerger implements ColumnMerger {
int index = inputIndex[tsBlockIndex];
// input column is empty or current time of input column is already larger than currentEndTime
// just appendNull rowCount null
- if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
+ if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
|| !comparator.satisfyCurEndTime(
inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
columnBuilder.appendNull(rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index fbcdf7deaf..751c2cac5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
@@ -508,15 +510,14 @@ public class LocalExecutionPlanner {
ColumnMerger merger;
// only has one input column
if (outputColumn.isSingleInputColumn()) {
- merger = new SingleColumnMerger(outputColumn.getInputLocation(0), timeComparator);
- } else if (!outputColumn.isOverlapped()) {
- // has more than one input columns but time of these input columns is not overlapped
- throw new UnsupportedOperationException(
- "has more than one input columns but time of these input columns is not overlapped is not supported");
+ merger = new SingleColumnMerger(outputColumn.getSourceLocation(0), timeComparator);
+ } else if (outputColumn.isOverlapped()) {
+ // has more than one input columns but time of these input columns is overlapped
+ merger = new MultiColumnMerger(outputColumn.getSourceLocations());
} else {
- // has more than one input columns and time of these input columns is overlapped
- throw new UnsupportedOperationException(
- "has more than one input columns and time of these input columns is overlapped is not supported");
+ // has more than one input columns and time of these input columns is not overlapped
+ merger =
+ new NonOverlappedMultiColumnMerger(outputColumn.getSourceLocations(), timeComparator);
}
mergers.add(merger);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
index 748d8d045a..e6e71c3f24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
@@ -65,7 +65,7 @@ public class OutputColumn {
return sourceLocations.size() == 1;
}
- public InputLocation getInputLocation(int index) {
+ public InputLocation getSourceLocation(int index) {
checkArgument(index < sourceLocations.size(), "index is not valid");
return sourceLocations.get(index);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
new file mode 100644
index 0000000000..b84f779060
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NonOverlappedMultiColumnMergerTest {
+
+ @Test
+ public void mergeTest() {
+ NonOverlappedMultiColumnMerger merger =
+ new NonOverlappedMultiColumnMerger(
+ ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)),
+ new AscTimeComparator());
+
+ TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder1.getTimeColumnBuilder().writeLong(2);
+ inputBuilder1.getColumnBuilder(0).writeInt(20);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(4);
+ inputBuilder1.getColumnBuilder(0).writeInt(40);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(5);
+ inputBuilder1.getColumnBuilder(0).appendNull();
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(6);
+ inputBuilder1.getColumnBuilder(0).writeInt(60);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(7);
+ inputBuilder1.getColumnBuilder(0).appendNull();
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(8);
+ inputBuilder1.getColumnBuilder(0).writeInt(80);
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(9);
+ inputBuilder1.getColumnBuilder(0).appendNull();
+ inputBuilder1.declarePosition();
+ inputBuilder1.getTimeColumnBuilder().writeLong(10);
+ inputBuilder1.getColumnBuilder(0).writeInt(100);
+ inputBuilder1.declarePosition();
+
+ TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ inputBuilder2.getTimeColumnBuilder().writeLong(11);
+ inputBuilder2.getColumnBuilder(0).writeInt(110);
+ inputBuilder2.declarePosition();
+ inputBuilder2.getTimeColumnBuilder().writeLong(12);
+ inputBuilder2.getColumnBuilder(0).writeInt(120);
+ inputBuilder2.declarePosition();
+ inputBuilder2.getTimeColumnBuilder().writeLong(13);
+ inputBuilder2.getColumnBuilder(0).writeInt(130);
+ inputBuilder2.declarePosition();
+ inputBuilder2.getTimeColumnBuilder().writeLong(14);
+ inputBuilder2.getColumnBuilder(0).writeInt(140);
+ inputBuilder2.declarePosition();
+
+ TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+ int[] inputIndex = new int[] {1, 0};
+ int[] updatedInputIndex = new int[] {1, 0};
+
+ // current endTime is 10
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(4);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(5);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(6);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(7);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(8);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(9);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(10);
+ builder.declarePosition();
+
+ ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 10, valueColumnBuilder);
+
+ assertEquals(8, updatedInputIndex[0]);
+ assertEquals(0, updatedInputIndex[1]);
+
+ Column result = valueColumnBuilder.build();
+
+ assertEquals(7, result.getPositionCount());
+ assertFalse(result.isNull(0));
+ assertEquals(40, result.getInt(0));
+ assertTrue(result.isNull(1));
+ assertFalse(result.isNull(2));
+ assertEquals(60, result.getInt(2));
+ assertTrue(result.isNull(3));
+ assertFalse(result.isNull(4));
+ assertEquals(80, result.getInt(4));
+ assertTrue(result.isNull(5));
+ assertFalse(result.isNull(6));
+ assertEquals(100, result.getInt(6));
+
+ // update inputIndex using shadowInputIndex
+ System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+ inputTsBlocks[0] = null;
+
+ // current endTime is 13
+ builder.reset();
+
+ timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(11);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(12);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(13);
+ builder.declarePosition();
+
+ valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 13, valueColumnBuilder);
+
+ assertEquals(8, updatedInputIndex[0]);
+ assertEquals(3, updatedInputIndex[1]);
+
+ result = valueColumnBuilder.build();
+
+ assertEquals(3, result.getPositionCount());
+ assertFalse(result.isNull(0));
+ assertEquals(110, result.getInt(0));
+ assertFalse(result.isNull(1));
+ assertEquals(120, result.getInt(1));
+ assertFalse(result.isNull(2));
+ assertEquals(130, result.getInt(2));
+
+ // update inputIndex using shadowInputIndex
+ System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+
+ // current endTime is 16
+ builder.reset();
+
+ timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(14);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(15);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(16);
+ builder.declarePosition();
+
+ valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 16, valueColumnBuilder);
+
+ assertEquals(8, updatedInputIndex[0]);
+ assertEquals(4, updatedInputIndex[1]);
+
+ result = valueColumnBuilder.build();
+
+ assertEquals(3, result.getPositionCount());
+ assertFalse(result.isNull(0));
+ assertEquals(140, result.getInt(0));
+ assertTrue(result.isNull(1));
+ assertTrue(result.isNull(2));
+
+ // update inputIndex using shadowInputIndex
+ System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+ inputTsBlocks[1] = null;
+
+ // current endTime is 20
+ builder.reset();
+
+ timeColumnBuilder = builder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(17);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(18);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(19);
+ builder.declarePosition();
+ timeColumnBuilder.writeLong(20);
+ builder.declarePosition();
+
+ valueColumnBuilder = builder.getColumnBuilder(0);
+
+ merger.mergeColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 20, valueColumnBuilder);
+
+ assertEquals(8, updatedInputIndex[0]);
+ assertEquals(4, updatedInputIndex[1]);
+
+ result = valueColumnBuilder.build();
+
+ assertEquals(4, result.getPositionCount());
+ assertTrue(result.isNull(0));
+ assertTrue(result.isNull(1));
+ assertTrue(result.isNull(2));
+ assertTrue(result.isNull(3));
+ }
+}