You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/05 09:19:54 UTC

[iotdb] 01/01: [IOTDB-3103] Implementation of NonOverlappedMultiColumnMerger

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 824ef3434edc5a3c79745f8ff5191ee1f71d168e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu May 5 17:19:30 2022 +0800

    [IOTDB-3103] Implementation of NonOverlappedMultiColumnMerger
---
 .../operator/process/TimeJoinOperator.java         |  31 ++-
 .../operator/process/merge/ColumnMerger.java       |   2 +-
 .../operator/process/merge/MultiColumnMerger.java  |   4 +-
 .../merge/NonOverlappedMultiColumnMerger.java      |  88 ++++++++
 .../operator/process/merge/SingleColumnMerger.java |  23 ++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  17 +-
 .../plan/planner/plan/parameter/OutputColumn.java  |   2 +-
 .../NonOverlappedMultiColumnMergerTest.java        | 230 +++++++++++++++++++++
 8 files changed, 377 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index 1ca4053049..219342fe5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -128,15 +128,32 @@ public class TimeJoinOperator implements ProcessOperator {
     // TsBlocks order by asc/desc
     long currentEndTime = 0;
     boolean init = false;
+
+    // get TsBlock for each input, put their time stamp into TimeSelector and then use the min Time
+    // among all the input TsBlock as the current output TsBlock's endTime.
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i) && children.get(i).hasNext()) {
-        inputIndex[i] = 0;
-        inputTsBlocks[i] = children.get(i).next();
-        if (!empty(i)) {
-          int rowSize = inputTsBlocks[i].getPositionCount();
-          for (int row = 0; row < rowSize; row++) {
-            timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+      if (!noMoreTsBlocks[i] && empty(i)) {
+        if (children.get(i).hasNext()) {
+          inputIndex[i] = 0;
+          inputTsBlocks[i] = children.get(i).next();
+          if (!empty(i)) {
+            int rowSize = inputTsBlocks[i].getPositionCount();
+            for (int row = 0; row < rowSize; row++) {
+              timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+            }
+          } else {
+            // child operator has next but return an empty TsBlock which means that it may not
+            // finish calculation in given time slice.
+            // In such case, TimeJoinOperator can't go on calculating, so we just return null.
+            // We can also use the while loop here to continuously call the hasNext() and next()
+            // methods of the child operator until its hasNext() returns false or the next() gets
+            // the data that is not empty, but this will cause the execution time of the while loop
+            // to be uncontrollable and may exceed all allocated time slice
+            return null;
           }
+        } else { // no more tsBlock
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
         }
       }
       // update the currentEndTime if the TsBlock is not empty
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
index 731827aea5..8df6c53c55 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
@@ -32,7 +32,7 @@ public interface ColumnMerger {
    * @return true if TsBlock at tsBlockIndex is null or its current read index is larger than its
    *     size
    */
-  default boolean empty(int tsBlockIndex, TsBlock[] inputTsBlocks, int[] inputIndex) {
+  static boolean empty(int tsBlockIndex, TsBlock[] inputTsBlocks, int[] inputIndex) {
     return inputTsBlocks[tsBlockIndex] == null
         || inputTsBlocks[tsBlockIndex].getPositionCount() == inputIndex[tsBlockIndex];
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
index 6a2eda841b..ef9f7ceb00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-/** has more than one input column, but these columns' time is not overlapped */
+/** has more than one input column, but these columns' time is overlapped */
 public class MultiColumnMerger implements ColumnMerger {
 
   private final List<InputLocation> inputLocations;
@@ -65,7 +65,7 @@ public class MultiColumnMerger implements ColumnMerger {
         int index = updatedInputIndex[tsBlockIndex];
 
         // current location's input column is not empty
-        if (!empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+        if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
           TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
           Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
           // time of current location's input column is equal to current row's time
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
new file mode 100644
index 0000000000..edf2972e95
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn;
+
+/** has more than one input column, but these columns' time is not overlapped */
+public class NonOverlappedMultiColumnMerger implements ColumnMerger {
+
+  private final List<InputLocation> inputLocations;
+
+  private final TimeComparator comparator;
+
+  // index for inputLocations indicating current iterating TsBlock's InputLocation
+  private int index;
+
+  /**
+   * these columns' time should never be overlapped
+   *
+   * @param inputLocations The time order in TsBlock represented by inputLocations should be
+   *     incremented by timestamp if it is order by time asc, otherwise decreased by timestamp if it
+   *     is order by time desc
+   */
+  public NonOverlappedMultiColumnMerger(
+      List<InputLocation> inputLocations, TimeComparator comparator) {
+    this.inputLocations = inputLocations;
+    this.comparator = comparator;
+    this.index = 0;
+  }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      TimeColumnBuilder timeBuilder,
+      long currentEndTime,
+      ColumnBuilder columnBuilder) {
+    // move to next InputLocation if current InputLocation's column has been consumed up
+    moveToNextIfNecessary(inputTsBlocks);
+    // merge current column
+    mergeOneColumn(
+        inputTsBlocks,
+        inputIndex,
+        updatedInputIndex,
+        timeBuilder,
+        currentEndTime,
+        columnBuilder,
+        inputLocations.get(index),
+        comparator);
+  }
+
+  private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) {
+    // if it is already at the last index, don't need to move
+    if (index == inputLocations.size() - 1) {
+      return;
+    }
+    InputLocation location = inputLocations.get(index);
+    int tsBlockIndex = location.getTsBlockIndex();
+    // if inputTsBlocks[tsBlockIndex] is null, means current index's TsBlock has been consumed up
+    while (index < inputLocations.size() - 1 && inputTsBlocks[tsBlockIndex] == null) {
+      index++;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
index 92cc2f3a94..eb4743a036 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
@@ -45,6 +45,27 @@ public class SingleColumnMerger implements ColumnMerger {
       TimeColumnBuilder timeBuilder,
       long currentEndTime,
       ColumnBuilder columnBuilder) {
+
+    mergeOneColumn(
+        inputTsBlocks,
+        inputIndex,
+        updatedInputIndex,
+        timeBuilder,
+        currentEndTime,
+        columnBuilder,
+        location,
+        comparator);
+  }
+
+  public static void mergeOneColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      TimeColumnBuilder timeBuilder,
+      long currentEndTime,
+      ColumnBuilder columnBuilder,
+      InputLocation location,
+      TimeComparator comparator) {
     int tsBlockIndex = location.getTsBlockIndex();
     int columnIndex = location.getValueColumnIndex();
 
@@ -52,7 +73,7 @@ public class SingleColumnMerger implements ColumnMerger {
     int index = inputIndex[tsBlockIndex];
     // input column is empty or current time of input column is already larger than currentEndTime
     // just appendNull rowCount null
-    if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
+    if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
         || !comparator.satisfyCurEndTime(
             inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
       columnBuilder.appendNull(rowCount);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index fbcdf7deaf..751c2cac5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
@@ -508,15 +510,14 @@ public class LocalExecutionPlanner {
         ColumnMerger merger;
         // only has one input column
         if (outputColumn.isSingleInputColumn()) {
-          merger = new SingleColumnMerger(outputColumn.getInputLocation(0), timeComparator);
-        } else if (!outputColumn.isOverlapped()) {
-          // has more than one input columns but time of these input columns is not overlapped
-          throw new UnsupportedOperationException(
-              "has more than one input columns but time of these input columns is not overlapped is not supported");
+          merger = new SingleColumnMerger(outputColumn.getSourceLocation(0), timeComparator);
+        } else if (outputColumn.isOverlapped()) {
+          // has more than one input columns but time of these input columns is overlapped
+          merger = new MultiColumnMerger(outputColumn.getSourceLocations());
         } else {
-          // has more than one input columns and time of these input columns is overlapped
-          throw new UnsupportedOperationException(
-              "has more than one input columns and time of these input columns is overlapped is not supported");
+          // has more than one input columns and time of these input columns is not overlapped
+          merger =
+              new NonOverlappedMultiColumnMerger(outputColumn.getSourceLocations(), timeComparator);
         }
         mergers.add(merger);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
index 748d8d045a..e6e71c3f24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/OutputColumn.java
@@ -65,7 +65,7 @@ public class OutputColumn {
     return sourceLocations.size() == 1;
   }
 
-  public InputLocation getInputLocation(int index) {
+  public InputLocation getSourceLocation(int index) {
     checkArgument(index < sourceLocations.size(), "index is not valid");
     return sourceLocations.get(index);
   }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
new file mode 100644
index 0000000000..b84f779060
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class NonOverlappedMultiColumnMergerTest {
+
+  @Test
+  public void mergeTest() {
+    NonOverlappedMultiColumnMerger merger =
+        new NonOverlappedMultiColumnMerger(
+            ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)),
+            new AscTimeComparator());
+
+    TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder1.getTimeColumnBuilder().writeLong(2);
+    inputBuilder1.getColumnBuilder(0).writeInt(20);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(4);
+    inputBuilder1.getColumnBuilder(0).writeInt(40);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(5);
+    inputBuilder1.getColumnBuilder(0).appendNull();
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(6);
+    inputBuilder1.getColumnBuilder(0).writeInt(60);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(7);
+    inputBuilder1.getColumnBuilder(0).appendNull();
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(8);
+    inputBuilder1.getColumnBuilder(0).writeInt(80);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(9);
+    inputBuilder1.getColumnBuilder(0).appendNull();
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(10);
+    inputBuilder1.getColumnBuilder(0).writeInt(100);
+    inputBuilder1.declarePosition();
+
+    TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder2.getTimeColumnBuilder().writeLong(11);
+    inputBuilder2.getColumnBuilder(0).writeInt(110);
+    inputBuilder2.declarePosition();
+    inputBuilder2.getTimeColumnBuilder().writeLong(12);
+    inputBuilder2.getColumnBuilder(0).writeInt(120);
+    inputBuilder2.declarePosition();
+    inputBuilder2.getTimeColumnBuilder().writeLong(13);
+    inputBuilder2.getColumnBuilder(0).writeInt(130);
+    inputBuilder2.declarePosition();
+    inputBuilder2.getTimeColumnBuilder().writeLong(14);
+    inputBuilder2.getColumnBuilder(0).writeInt(140);
+    inputBuilder2.declarePosition();
+
+    TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+    int[] inputIndex = new int[] {1, 0};
+    int[] updatedInputIndex = new int[] {1, 0};
+
+    // current endTime is 10
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(4);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(5);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(6);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(7);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(8);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(9);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(10);
+    builder.declarePosition();
+
+    ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 10, valueColumnBuilder);
+
+    assertEquals(8, updatedInputIndex[0]);
+    assertEquals(0, updatedInputIndex[1]);
+
+    Column result = valueColumnBuilder.build();
+
+    assertEquals(7, result.getPositionCount());
+    assertFalse(result.isNull(0));
+    assertEquals(40, result.getInt(0));
+    assertTrue(result.isNull(1));
+    assertFalse(result.isNull(2));
+    assertEquals(60, result.getInt(2));
+    assertTrue(result.isNull(3));
+    assertFalse(result.isNull(4));
+    assertEquals(80, result.getInt(4));
+    assertTrue(result.isNull(5));
+    assertFalse(result.isNull(6));
+    assertEquals(100, result.getInt(6));
+
+    // update inputIndex using shadowInputIndex
+    System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+    inputTsBlocks[0] = null;
+
+    // current endTime is 13
+    builder.reset();
+
+    timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(11);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(12);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(13);
+    builder.declarePosition();
+
+    valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 13, valueColumnBuilder);
+
+    assertEquals(8, updatedInputIndex[0]);
+    assertEquals(3, updatedInputIndex[1]);
+
+    result = valueColumnBuilder.build();
+
+    assertEquals(3, result.getPositionCount());
+    assertFalse(result.isNull(0));
+    assertEquals(110, result.getInt(0));
+    assertFalse(result.isNull(1));
+    assertEquals(120, result.getInt(1));
+    assertFalse(result.isNull(2));
+    assertEquals(130, result.getInt(2));
+
+    // update inputIndex using shadowInputIndex
+    System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+
+    // current endTime is 16
+    builder.reset();
+
+    timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(14);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(15);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(16);
+    builder.declarePosition();
+
+    valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 16, valueColumnBuilder);
+
+    assertEquals(8, updatedInputIndex[0]);
+    assertEquals(4, updatedInputIndex[1]);
+
+    result = valueColumnBuilder.build();
+
+    assertEquals(3, result.getPositionCount());
+    assertFalse(result.isNull(0));
+    assertEquals(140, result.getInt(0));
+    assertTrue(result.isNull(1));
+    assertTrue(result.isNull(2));
+
+    // update inputIndex using shadowInputIndex
+    System.arraycopy(updatedInputIndex, 0, inputIndex, 0, 2);
+    inputTsBlocks[1] = null;
+
+    // current endTime is 20
+    builder.reset();
+
+    timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(17);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(18);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(19);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(20);
+    builder.declarePosition();
+
+    valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 20, valueColumnBuilder);
+
+    assertEquals(8, updatedInputIndex[0]);
+    assertEquals(4, updatedInputIndex[1]);
+
+    result = valueColumnBuilder.build();
+
+    assertEquals(4, result.getPositionCount());
+    assertTrue(result.isNull(0));
+    assertTrue(result.isNull(1));
+    assertTrue(result.isNull(2));
+    assertTrue(result.isNull(3));
+  }
+}