You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/25 11:20:31 UTC

[iotdb] 01/01: Implement MultiColumnMerger to support querying data of one timesereis distributed on different DataNodes

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c1f76ec2bfa611afa71d3c934c608dec0cb44c30
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Apr 25 19:20:14 2022 +0800

    Implement MultiColumnMerger to support querying data of one timesereis distributed on different DataNodes
---
 .../mpp/operator/process/merge/ColumnMerger.java   |   4 +-
 .../operator/process/merge/MultiColumnMerger.java  |  96 +++++++++++
 .../operator/process/merge/SingleColumnMerger.java |  66 +++++++-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   2 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   4 +-
 .../iotdb/db/mpp/operator/LimitOperatorTest.java   |   4 +-
 .../db/mpp/operator/MultiColumnMergerTest.java     | 185 +++++++++++++++++++++
 .../db/mpp/operator/SingleColumnMergerTest.java    |  96 ++++++++++-
 .../db/mpp/operator/TimeJoinOperatorTest.java      |  10 +-
 9 files changed, 446 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
index 04ed2bb09e..0f325ac3bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/ColumnMerger.java
@@ -41,7 +41,9 @@ public interface ColumnMerger {
    * merge columns belonging to same series into one column
    *
    * @param inputTsBlocks all source TsBlocks, some of which will cantain source column
-   * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks
+   * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
+   *     we should only read from this array and not update it because others will use the start
+   *     index value in inputIndex array
    * @param updatedInputIndex current index for each source TsBlock after merging
    * @param timeBuilder result time column, which is already generated and used to indicate each
    *     row's timestamp
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java
new file mode 100644
index 0000000000..2ff29d06be
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/MultiColumnMerger.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator.process.merge;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+/** has more than one input column, but these columns' time is not overlapped */
+public class MultiColumnMerger implements ColumnMerger {
+
+  private final List<InputLocation> inputLocations;
+
+  public MultiColumnMerger(List<InputLocation> inputLocations) {
+    this.inputLocations = inputLocations;
+  }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      TimeColumnBuilder timeBuilder,
+      long currentEndTime,
+      ColumnBuilder columnBuilder) {
+    int rowCount = timeBuilder.getPositionCount();
+
+    // init startIndex for each input locations
+    for (InputLocation inputLocation : inputLocations) {
+      int tsBlockIndex = inputLocation.getTsBlockIndex();
+      updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
+    }
+
+    for (int i = 0; i < rowCount; i++) {
+      // record whether current row already has value to be appended
+      boolean appendValue = false;
+      // we don't use MinHeap here to choose the right column, because inputLocations.size() won't
+      // be very large.
+      // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
+      // than PriorityQueue.
+      for (InputLocation location : inputLocations) {
+        int tsBlockIndex = location.getTsBlockIndex();
+        int columnIndex = location.getValueColumnIndex();
+        int index = updatedInputIndex[tsBlockIndex];
+
+        // current location's input column is not empty
+        if (!empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+          TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
+          Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+          // time of current location's input column is equal to current row's time and value of
+          // current location's input column is not null
+          if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+            if (valueColumn.isNull(index)) {
+              columnBuilder.appendNull();
+            } else {
+              columnBuilder.write(valueColumn, index);
+            }
+            // increase the index
+            index++;
+            // update the index after merging
+            updatedInputIndex[tsBlockIndex] = index;
+            // we can safely set appendValue to true and then break the loop, because these input
+            // columns' time is not overlapped
+            appendValue = true;
+            break;
+          }
+        }
+      }
+      // all input columns are null at current row, so just append a null
+      if (!appendValue) {
+        columnBuilder.appendNull();
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
index 8e3c82cb9a..83a2e69733 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/merge/SingleColumnMerger.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.operator.process.merge;
 
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -28,12 +29,24 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 /** only has one input column */
 public class SingleColumnMerger implements ColumnMerger {
 
+  private static final TimeComparator ASC_TIME_COMPARATOR = new AscTimeComparator();
+
+  private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator();
+
   private final InputLocation location;
 
-  public SingleColumnMerger(InputLocation location) {
+  private final TimeComparator comparator;
+
+  public SingleColumnMerger(InputLocation location, OrderBy orderBy) {
     this.location = location;
+    if (orderBy == OrderBy.TIMESTAMP_ASC) {
+      comparator = ASC_TIME_COMPARATOR;
+    } else {
+      comparator = DESC_TIME_COMPARATOR;
+    }
   }
 
+  @Override
   public void mergeColumn(
       TsBlock[] inputTsBlocks,
       int[] inputIndex,
@@ -49,19 +62,32 @@ public class SingleColumnMerger implements ColumnMerger {
     // input column is empty or current time of input column is already larger than currentEndTime
     // just appendNull rowCount null
     if (empty(tsBlockIndex, inputTsBlocks, inputIndex)
-        || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) > currentEndTime) {
+        || !comparator.satisfy(inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
       columnBuilder.appendNull(rowCount);
     } else {
       // read from input column and write it into columnBuilder
       TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
       Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
       for (int i = 0; i < rowCount; i++) {
-        // current index is less than size of input column and current time of input column is equal
-        // to result row's time and input column's value at index is not null
-        if (timeColumn.getPositionCount() > index
-            && timeColumn.getLong(index) == timeBuilder.getTime(i)
-            && !valueColumn.isNull(index)) {
-          columnBuilder.write(valueColumn, index++);
+        // current index reaches the size of input column or current time of input column is already
+        // larger than currentEndTime, use null column to fill the remaining
+        if (timeColumn.getPositionCount() == index
+            || !comparator.satisfy(
+                inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
+          columnBuilder.appendNull(rowCount - i);
+          break;
+        }
+        // current time of input column is equal to result row's time
+        if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
+          // if input column's value at index is null, append a null value
+          if (valueColumn.isNull(index)) {
+            columnBuilder.appendNull();
+          } else {
+            // if input column's value at index is not null, append the value
+            columnBuilder.write(valueColumn, index);
+          }
+          // increase the index
+          index++;
         } else {
           // otherwise, append a null
           columnBuilder.appendNull();
@@ -71,4 +97,28 @@ public class SingleColumnMerger implements ColumnMerger {
       updatedInputIndex[tsBlockIndex] = index;
     }
   }
+
+  private interface TimeComparator {
+
+    /** @return true if time is satisfied with endTime, otherwise false */
+    boolean satisfy(long time, long endTime);
+  }
+
+  private static class AscTimeComparator implements TimeComparator {
+
+    /** @return if order by time asc, return true if time <= endTime, otherwise false */
+    @Override
+    public boolean satisfy(long time, long endTime) {
+      return time <= endTime;
+    }
+  }
+
+  private static class DescTimeComparator implements TimeComparator {
+
+    /** @return if order by time desc, return true if time >= endTime, otherwise false */
+    @Override
+    public boolean satisfy(long time, long endTime) {
+      return time >= endTime;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 230ffac797..05864ace5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -323,7 +323,7 @@ public class LocalExecutionPlanner {
         ColumnMerger merger;
         // only has one input column
         if (outputColumn.isSingleInputColumn()) {
-          merger = new SingleColumnMerger(outputColumn.getInputLocation(0));
+          merger = new SingleColumnMerger(outputColumn.getInputLocation(0), OrderBy.TIMESTAMP_ASC);
         } else if (!outputColumn.isOverlapped()) {
           // has more than one input columns but time of these input columns is not overlapped
           throw new UnsupportedOperationException(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index 7596bde6d9..65cca8e51c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -145,8 +145,8 @@ public class DataDriverTest {
               OrderBy.TIMESTAMP_ASC,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32),
               Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0)),
-                  new SingleColumnMerger(new InputLocation(1, 0))));
+                  new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+                  new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
 
       LimitOperator limitOperator =
           new LimitOperator(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
index b8591c8fdb..6fc0e04647 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/LimitOperatorTest.java
@@ -140,8 +140,8 @@ public class LimitOperatorTest {
               OrderBy.TIMESTAMP_ASC,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32),
               Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0)),
-                  new SingleColumnMerger(new InputLocation(1, 0))));
+                  new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+                  new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
 
       LimitOperator limitOperator =
           new LimitOperator(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java
new file mode 100644
index 0000000000..cfb043cc0c
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/MultiColumnMergerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.mpp.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MultiColumnMergerTest {
+
+  @Test
+  public void mergeTest1() {
+    MultiColumnMerger merger =
+        new MultiColumnMerger(ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)));
+
+    TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder1.getTimeColumnBuilder().writeLong(2);
+    inputBuilder1.getColumnBuilder(0).writeInt(20);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(4);
+    inputBuilder1.getColumnBuilder(0).writeInt(40);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(5);
+    inputBuilder1.getColumnBuilder(0).appendNull();
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(6);
+    inputBuilder1.getColumnBuilder(0).writeInt(60);
+    inputBuilder1.declarePosition();
+
+    TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder2.getTimeColumnBuilder().writeLong(8);
+    inputBuilder2.getColumnBuilder(0).writeInt(800);
+    inputBuilder2.declarePosition();
+    inputBuilder2.getTimeColumnBuilder().writeLong(10);
+    inputBuilder2.getColumnBuilder(0).writeInt(1000);
+    inputBuilder2.declarePosition();
+
+    TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+    int[] inputIndex = new int[] {1, 0};
+    int[] updatedInputIndex = new int[] {1, 0};
+
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(4);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(5);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(6);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(7);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(8);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(9);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(10);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(11);
+    builder.declarePosition();
+    ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 11, valueColumnBuilder);
+
+    assertEquals(4, updatedInputIndex[0]);
+    assertEquals(2, updatedInputIndex[1]);
+
+    Column result = valueColumnBuilder.build();
+
+    assertEquals(8, result.getPositionCount());
+    assertFalse(result.isNull(0));
+    assertEquals(40, result.getInt(0));
+    assertTrue(result.isNull(1));
+    assertFalse(result.isNull(2));
+    assertEquals(60, result.getInt(2));
+    assertTrue(result.isNull(3));
+    assertFalse(result.isNull(4));
+    assertEquals(800, result.getInt(4));
+    assertTrue(result.isNull(5));
+    assertFalse(result.isNull(6));
+    assertEquals(1000, result.getInt(6));
+    assertTrue(result.isNull(7));
+  }
+
+  @Test
+  public void mergeTest2() {
+    MultiColumnMerger merger =
+        new MultiColumnMerger(ImmutableList.of(new InputLocation(0, 0), new InputLocation(1, 0)));
+
+    TsBlockBuilder inputBuilder1 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder1.getTimeColumnBuilder().writeLong(2);
+    inputBuilder1.getColumnBuilder(0).writeInt(20);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(4);
+    inputBuilder1.getColumnBuilder(0).writeInt(40);
+    inputBuilder1.declarePosition();
+    inputBuilder1.getTimeColumnBuilder().writeLong(6);
+    inputBuilder1.getColumnBuilder(0).writeInt(60);
+    inputBuilder1.declarePosition();
+
+    TsBlockBuilder inputBuilder2 = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder2.getTimeColumnBuilder().writeLong(8);
+    inputBuilder2.getColumnBuilder(0).writeInt(800);
+    inputBuilder2.declarePosition();
+    inputBuilder2.getTimeColumnBuilder().writeLong(10);
+    inputBuilder2.getColumnBuilder(0).writeInt(1000);
+    inputBuilder2.declarePosition();
+
+    TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder1.build(), inputBuilder2.build()};
+    int[] inputIndex = new int[] {1, 0};
+    int[] updatedInputIndex = new int[] {1, 0};
+
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(4);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(5);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(6);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(7);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(8);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(9);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(10);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(11);
+    builder.declarePosition();
+    ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 11, valueColumnBuilder);
+
+    assertEquals(3, updatedInputIndex[0]);
+    assertEquals(2, updatedInputIndex[1]);
+
+    Column result = valueColumnBuilder.build();
+
+    assertEquals(8, result.getPositionCount());
+    assertFalse(result.isNull(0));
+    assertEquals(40, result.getInt(0));
+    assertTrue(result.isNull(1));
+    assertFalse(result.isNull(2));
+    assertEquals(60, result.getInt(2));
+    assertTrue(result.isNull(3));
+    assertFalse(result.isNull(4));
+    assertEquals(800, result.getInt(4));
+    assertTrue(result.isNull(5));
+    assertFalse(result.isNull(6));
+    assertEquals(1000, result.getInt(6));
+    assertTrue(result.isNull(7));
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
index 9cd3b4bd86..38a5a550f4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SingleColumnMergerTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.operator;
 
 import org.apache.iotdb.db.mpp.operator.process.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
@@ -39,7 +40,8 @@ public class SingleColumnMergerTest {
 
   @Test
   public void mergeTest1() {
-    SingleColumnMerger merger = new SingleColumnMerger(new InputLocation(0, 0));
+    SingleColumnMerger merger =
+        new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
 
     TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
     inputBuilder.getTimeColumnBuilder().writeLong(2);
@@ -48,6 +50,9 @@ public class SingleColumnMergerTest {
     inputBuilder.getTimeColumnBuilder().writeLong(4);
     inputBuilder.getColumnBuilder(0).writeInt(40);
     inputBuilder.declarePosition();
+    inputBuilder.getTimeColumnBuilder().writeLong(5);
+    inputBuilder.getColumnBuilder(0).appendNull();
+    inputBuilder.declarePosition();
     inputBuilder.getTimeColumnBuilder().writeLong(6);
     inputBuilder.getColumnBuilder(0).writeInt(60);
     inputBuilder.declarePosition();
@@ -71,6 +76,8 @@ public class SingleColumnMergerTest {
     merger.mergeColumn(
         inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
 
+    assertEquals(4, updatedInputIndex[0]);
+
     Column result = valueColumnBuilder.build();
 
     assertEquals(4, result.getPositionCount());
@@ -85,7 +92,8 @@ public class SingleColumnMergerTest {
   /** input tsblock is null */
   @Test
   public void mergeTest2() {
-    SingleColumnMerger merger = new SingleColumnMerger(new InputLocation(0, 0));
+    SingleColumnMerger merger =
+        new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
 
     TsBlock[] inputTsBlocks = new TsBlock[1];
     int[] inputIndex = new int[] {0};
@@ -106,6 +114,90 @@ public class SingleColumnMergerTest {
     merger.mergeColumn(
         inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
 
+    assertEquals(0, updatedInputIndex[0]);
+
+    Column result = valueColumnBuilder.build();
+
+    assertEquals(4, result.getPositionCount());
+    assertTrue(result.isNull(0));
+    assertTrue(result.isNull(1));
+    assertTrue(result.isNull(2));
+    assertTrue(result.isNull(3));
+  }
+
+  /** current time of input tsblock is larger than endTime of timeBuilder in asc order */
+  @Test
+  public void mergeTest3() {
+    SingleColumnMerger merger =
+        new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC);
+
+    TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder.getTimeColumnBuilder().writeLong(8);
+    inputBuilder.getColumnBuilder(0).writeInt(80);
+    inputBuilder.declarePosition();
+
+    TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder.build()};
+    int[] inputIndex = new int[] {0};
+    int[] updatedInputIndex = new int[] {0};
+
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(4);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(5);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(6);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(7);
+    builder.declarePosition();
+    ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 7, valueColumnBuilder);
+
+    assertEquals(0, updatedInputIndex[0]);
+
+    Column result = valueColumnBuilder.build();
+
+    assertEquals(4, result.getPositionCount());
+    assertTrue(result.isNull(0));
+    assertTrue(result.isNull(1));
+    assertTrue(result.isNull(2));
+    assertTrue(result.isNull(3));
+  }
+
+  /** current time of input tsblock is less than endTime of timeBuilder in desc order */
+  @Test
+  public void mergeTest4() {
+    SingleColumnMerger merger =
+        new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_DESC);
+
+    TsBlockBuilder inputBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    inputBuilder.getTimeColumnBuilder().writeLong(2);
+    inputBuilder.getColumnBuilder(0).writeInt(20);
+    inputBuilder.declarePosition();
+
+    TsBlock[] inputTsBlocks = new TsBlock[] {inputBuilder.build()};
+    int[] inputIndex = new int[] {0};
+    int[] updatedInputIndex = new int[] {0};
+
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(7);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(6);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(5);
+    builder.declarePosition();
+    timeColumnBuilder.writeLong(4);
+    builder.declarePosition();
+    ColumnBuilder valueColumnBuilder = builder.getColumnBuilder(0);
+
+    merger.mergeColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, timeColumnBuilder, 4, valueColumnBuilder);
+
+    assertEquals(0, updatedInputIndex[0]);
+
     Column result = valueColumnBuilder.build();
 
     assertEquals(4, result.getPositionCount());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
index 10581cc3f9..a0f4d08023 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/TimeJoinOperatorTest.java
@@ -137,8 +137,8 @@ public class TimeJoinOperatorTest {
               OrderBy.TIMESTAMP_ASC,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32),
               Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0)),
-                  new SingleColumnMerger(new InputLocation(1, 0))));
+                  new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+                  new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC)));
       int count = 0;
       while (timeJoinOperator.hasNext()) {
         TsBlock tsBlock = timeJoinOperator.next();
@@ -251,9 +251,9 @@ public class TimeJoinOperatorTest {
               OrderBy.TIMESTAMP_ASC,
               Arrays.asList(TSDataType.INT32, TSDataType.INT32, TSDataType.INT32),
               Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0)),
-                  new SingleColumnMerger(new InputLocation(1, 0)),
-                  new SingleColumnMerger(new InputLocation(2, 0))));
+                  new SingleColumnMerger(new InputLocation(0, 0), OrderBy.TIMESTAMP_ASC),
+                  new SingleColumnMerger(new InputLocation(1, 0), OrderBy.TIMESTAMP_ASC),
+                  new SingleColumnMerger(new InputLocation(2, 0), OrderBy.TIMESTAMP_ASC)));
       int count = 0;
       while (timeJoinOperator.hasNext()) {
         TsBlock tsBlock = timeJoinOperator.next();