You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/01 07:19:55 UTC

[iotdb] branch IOTDB-4006 created (now 102b86b7d0)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch IOTDB-4006
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 102b86b7d0 Improve the performance of Raw Query Without ValueFilter for nonAligned

This branch includes the following new commits:

     new 102b86b7d0 Improve the performance of Raw Query Without ValueFilter for nonAligned

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Improve the performance of Raw Query Without ValueFilter for nonAligned

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4006
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 102b86b7d0488eeb63231cff90caeb5b59e490bf
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 1 15:18:56 2022 +0800

    Improve the performance of Raw Query Without ValueFilter for nonAligned
---
 .../operator/process/DeviceMergeOperator.java      |  2 +-
 .../RowBasedTimeJoinOperator.java}                 | 62 ++++++++++++----------
 .../process/{ => join}/TimeJoinOperator.java       |  7 +--
 .../{ => join}/merge/AscTimeComparator.java        |  2 +-
 .../process/{ => join}/merge/ColumnMerger.java     | 24 ++++++++-
 .../{ => join}/merge/DescTimeComparator.java       |  2 +-
 .../{ => join}/merge/MultiColumnMerger.java        | 56 ++++++++++++++++++-
 .../merge/NonOverlappedMultiColumnMerger.java      | 25 +++++++--
 .../{ => join}/merge/SingleColumnMerger.java       | 44 ++++++++++++++-
 .../process/{ => join}/merge/TimeComparator.java   |  2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 19 +++----
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  6 +--
 .../operator/AlignedSeriesScanOperatorTest.java    |  8 +--
 .../operator/DeviceMergeOperatorTest.java          |  2 +-
 .../mpp/execution/operator/LimitOperatorTest.java  |  6 +--
 .../execution/operator/MultiColumnMergerTest.java  |  2 +-
 .../NonOverlappedMultiColumnMergerTest.java        |  4 +-
 .../mpp/execution/operator/OffsetOperatorTest.java |  6 +--
 .../operator/RawDataAggregationOperatorTest.java   |  6 +--
 .../execution/operator/SingleColumnMergerTest.java |  6 +--
 .../execution/operator/TimeJoinOperatorTest.java   |  8 +--
 21 files changed, 220 insertions(+), 79 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index d16d47aa28..e0ccbe86ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
similarity index 85%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 6775c66447..f026ef35c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,7 +38,7 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 
-public class TimeJoinOperator implements ProcessOperator {
+public class RowBasedTimeJoinOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
 
@@ -79,7 +80,7 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private final TimeComparator comparator;
 
-  public TimeJoinOperator(
+  public RowBasedTimeJoinOperator(
       OperatorContext operatorContext,
       List<Operator> children,
       Ordering mergeOrder,
@@ -139,10 +140,7 @@ public class TimeJoinOperator implements ProcessOperator {
           inputIndex[i] = 0;
           inputTsBlocks[i] = children.get(i).next();
           if (!empty(i)) {
-            int rowSize = inputTsBlocks[i].getPositionCount();
-            for (int row = 0; row < rowSize; row++) {
-              timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
-            }
+            updateTimeSelector(i);
           } else {
             // child operator has next but return an empty TsBlock which means that it may not
             // finish calculation in given time slice.
@@ -175,26 +173,30 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    while (!timeSelector.isEmpty()
-        && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
-      timeBuilder.writeLong(timeSelector.pollFirst());
-      tsBlockBuilder.declarePosition();
-    }
-
-    for (int i = 0; i < outputColumnCount; i++) {
-      ColumnMerger merger = mergers.get(i);
-      merger.mergeColumn(
-          inputTsBlocks,
-          inputIndex,
-          shadowInputIndex,
-          timeBuilder,
-          currentEndTime,
-          tsBlockBuilder.getColumnBuilder(i));
-    }
-
-    // update inputIndex using shadowInputIndex
-    System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
+    long currentTime;
+    do {
+      currentTime = timeSelector.pollFirst();
+      timeBuilder.writeLong(currentTime);
+      for (int i = 0; i < outputColumnCount; i++) {
+        ColumnMerger merger = mergers.get(i);
+        merger.mergeColumn(
+            inputTsBlocks,
+            inputIndex,
+            shadowInputIndex,
+            currentTime,
+            tsBlockBuilder.getColumnBuilder(i));
+      }
 
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        if (inputIndex[i] != shadowInputIndex[i]) {
+          inputIndex[i] = shadowInputIndex[i];
+          if (!empty(i)) {
+            updateTimeSelector(i);
+          }
+        }
+      }
+      tsBlockBuilder.declarePosition();
+    } while (currentTime < currentEndTime && !timeSelector.isEmpty());
     return tsBlockBuilder.build();
   }
 
@@ -242,6 +244,10 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  private void updateTimeSelector(int index) {
+    timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index 6775c66447..eb185e09ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
index 95b7316844..96d9f86f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class AscTimeComparator implements TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
index 8df6c53c55..7a00f2adf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -38,7 +38,8 @@ public interface ColumnMerger {
   }
 
   /**
-   * merge columns belonging to same series into one column
+   * merge columns belonging to same series into one column, merge until each input column's time is
+   * larger than currentEndTime
    *
    * @param inputTsBlocks all source TsBlocks, some of which will contain source column
    * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
@@ -57,4 +58,23 @@ public interface ColumnMerger {
       TimeColumnBuilder timeBuilder,
       long currentEndTime,
       ColumnBuilder columnBuilder);
+
+  /**
+   * merge columns belonging to same series into one column, merge just one row whose time is equal
+   * to currentTime
+   *
+   * @param inputTsBlocks all source TsBlocks, some of which will contain source column
+   * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
+   *     we should only read from this array and not update it because others will use the start
+   *     index value in inputIndex array
+   * @param updatedInputIndex current index for each source TsBlock after merging
+   * @param currentTime merge just one row whose time is equal to currentTime
+   * @param columnBuilder used to write merged value into
+   */
+  void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
index f53c97d8fe..67c0112dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class DescTimeComparator implements TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
index ef9f7ceb00..79a1367ca7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -93,4 +93,58 @@ public class MultiColumnMerger implements ColumnMerger {
       }
     }
   }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+
+    // init startIndex for each input locations
+    for (InputLocation inputLocation : inputLocations) {
+      int tsBlockIndex = inputLocation.getTsBlockIndex();
+      updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
+    }
+
+    // record whether current row already has value to be appended
+    boolean appendValue = false;
+    // we don't use MinHeap here to choose the right column, because inputLocations.size() won't
+    // be very large.
+    // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
+    // than PriorityQueue.
+    for (InputLocation location : inputLocations) {
+      int tsBlockIndex = location.getTsBlockIndex();
+      int columnIndex = location.getValueColumnIndex();
+      int index = updatedInputIndex[tsBlockIndex];
+
+      // current location's input column is not empty
+      if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+        TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
+        Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+        // time of current location's input column is equal to current row's time
+        if (timeColumn.getLong(index) == currentTime) {
+          // value of current location's input column is not null
+          if (!valueColumn.isNull(index)) {
+            columnBuilder.write(valueColumn, index);
+            appendValue = true;
+          }
+          // increase the index
+          index++;
+          // update the index after merging
+          updatedInputIndex[tsBlockIndex] = index;
+          // we can safely set appendValue to true and then break the loop, because these input
+          // columns' time is not overlapped
+          if (appendValue) {
+            break;
+          }
+        }
+      }
+    }
+    // all input columns are null at current row, so just append a null
+    if (!appendValue) {
+      columnBuilder.appendNull();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
index a22c31faf5..c338130b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn;
-
 /** has more than one input column, but these columns' time is not overlapped */
 public class NonOverlappedMultiColumnMerger implements ColumnMerger {
 
@@ -62,7 +60,7 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
     // move to next InputLocation if current InputLocation's column has been consumed up
     moveToNextIfNecessary(inputTsBlocks);
     // merge current column
-    mergeOneColumn(
+    SingleColumnMerger.mergeOneColumn(
         inputTsBlocks,
         inputIndex,
         updatedInputIndex,
@@ -73,6 +71,25 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
         comparator);
   }
 
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+    // move to next InputLocation if current InputLocation's column has been consumed up
+    moveToNextIfNecessary(inputTsBlocks);
+    // merge current column
+    SingleColumnMerger.mergeOneColumn(
+        inputTsBlocks,
+        inputIndex,
+        updatedInputIndex,
+        currentTime,
+        columnBuilder,
+        inputLocations.get(index));
+  }
+
   private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) {
     // if it is already at the last index, don't need to move
     if (index == inputLocations.size() - 1) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
index f621f59d99..0c9f7e60e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -110,4 +110,46 @@ public class SingleColumnMerger implements ColumnMerger {
     // update the index after merging
     updatedInputIndex[tsBlockIndex] = index;
   }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+    mergeOneColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location);
+  }
+
+  public static void mergeOneColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder,
+      InputLocation location) {
+    int tsBlockIndex = location.getTsBlockIndex();
+    int columnIndex = location.getValueColumnIndex();
+
+    int index = inputIndex[tsBlockIndex];
+    // input column is empty or current time of input column is already larger than currentEndTime
+    // just appendNull
+    if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
+        || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) {
+      columnBuilder.appendNull();
+    } else {
+      // read from input column and write it into columnBuilder
+      Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+
+      if (valueColumn.isNull(index)) {
+        columnBuilder.appendNull();
+      } else {
+        columnBuilder.write(valueColumn, index);
+      }
+      index++;
+    }
+    // update the index after merging
+    updatedInputIndex[tsBlockIndex] = index;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
index db017f6186..47eff4fe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public interface TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 099d3d5594..a981c9e631 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -68,19 +67,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePr
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
@@ -1146,7 +1147,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new TimeJoinOperator(
+    return new RowBasedTimeJoinOperator(
         operatorContext,
         children,
         node.getMergeOrder(),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index dc13943bef..c71db0a64d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -38,9 +38,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
index b45a11f668..e1a6bc5eb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
index 9aa09f18a5..18c00627e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
index 6625f296e6..32701ac2b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
index 16e03c71e7..4668fb67f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
index b84f779060..76216233a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index 9256c852c9..e05cad8041 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -31,9 +31,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 9b9b81a856..6a9902f24b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
index 3b12082c6e..f2043c281a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 6030229577..aabfdb7eb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -29,10 +29,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;