You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/01 07:19:56 UTC

[iotdb] 01/01: Improve the performance of Raw Query Without ValueFilter for nonAligned

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4006
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 102b86b7d0488eeb63231cff90caeb5b59e490bf
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 1 15:18:56 2022 +0800

    Improve the performance of Raw Query Without ValueFilter for nonAligned
---
 .../operator/process/DeviceMergeOperator.java      |  2 +-
 .../RowBasedTimeJoinOperator.java}                 | 62 ++++++++++++----------
 .../process/{ => join}/TimeJoinOperator.java       |  7 +--
 .../{ => join}/merge/AscTimeComparator.java        |  2 +-
 .../process/{ => join}/merge/ColumnMerger.java     | 24 ++++++++-
 .../{ => join}/merge/DescTimeComparator.java       |  2 +-
 .../{ => join}/merge/MultiColumnMerger.java        | 56 ++++++++++++++++++-
 .../merge/NonOverlappedMultiColumnMerger.java      | 25 +++++++--
 .../{ => join}/merge/SingleColumnMerger.java       | 44 ++++++++++++++-
 .../process/{ => join}/merge/TimeComparator.java   |  2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 19 +++----
 .../iotdb/db/mpp/execution/DataDriverTest.java     |  6 +--
 .../operator/AlignedSeriesScanOperatorTest.java    |  8 +--
 .../operator/DeviceMergeOperatorTest.java          |  2 +-
 .../mpp/execution/operator/LimitOperatorTest.java  |  6 +--
 .../execution/operator/MultiColumnMergerTest.java  |  2 +-
 .../NonOverlappedMultiColumnMergerTest.java        |  4 +-
 .../mpp/execution/operator/OffsetOperatorTest.java |  6 +--
 .../operator/RawDataAggregationOperatorTest.java   |  6 +--
 .../execution/operator/SingleColumnMergerTest.java |  6 +--
 .../execution/operator/TimeJoinOperatorTest.java   |  8 +--
 21 files changed, 220 insertions(+), 79 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index d16d47aa28..e0ccbe86ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
similarity index 85%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 6775c66447..f026ef35c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,7 +38,7 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 
-public class TimeJoinOperator implements ProcessOperator {
+public class RowBasedTimeJoinOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
 
@@ -79,7 +80,7 @@ public class TimeJoinOperator implements ProcessOperator {
 
   private final TimeComparator comparator;
 
-  public TimeJoinOperator(
+  public RowBasedTimeJoinOperator(
       OperatorContext operatorContext,
       List<Operator> children,
       Ordering mergeOrder,
@@ -139,10 +140,7 @@ public class TimeJoinOperator implements ProcessOperator {
           inputIndex[i] = 0;
           inputTsBlocks[i] = children.get(i).next();
           if (!empty(i)) {
-            int rowSize = inputTsBlocks[i].getPositionCount();
-            for (int row = 0; row < rowSize; row++) {
-              timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
-            }
+            updateTimeSelector(i);
           } else {
             // child operator has next but return an empty TsBlock which means that it may not
             // finish calculation in given time slice.
@@ -175,26 +173,30 @@ public class TimeJoinOperator implements ProcessOperator {
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    while (!timeSelector.isEmpty()
-        && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
-      timeBuilder.writeLong(timeSelector.pollFirst());
-      tsBlockBuilder.declarePosition();
-    }
-
-    for (int i = 0; i < outputColumnCount; i++) {
-      ColumnMerger merger = mergers.get(i);
-      merger.mergeColumn(
-          inputTsBlocks,
-          inputIndex,
-          shadowInputIndex,
-          timeBuilder,
-          currentEndTime,
-          tsBlockBuilder.getColumnBuilder(i));
-    }
-
-    // update inputIndex using shadowInputIndex
-    System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
+    long currentTime;
+    do {
+      currentTime = timeSelector.pollFirst();
+      timeBuilder.writeLong(currentTime);
+      for (int i = 0; i < outputColumnCount; i++) {
+        ColumnMerger merger = mergers.get(i);
+        merger.mergeColumn(
+            inputTsBlocks,
+            inputIndex,
+            shadowInputIndex,
+            currentTime,
+            tsBlockBuilder.getColumnBuilder(i));
+      }
 
+      for (int i = 0; i < inputOperatorsCount; i++) {
+        if (inputIndex[i] != shadowInputIndex[i]) {
+          inputIndex[i] = shadowInputIndex[i];
+          if (!empty(i)) {
+            updateTimeSelector(i);
+          }
+        }
+      }
+      tsBlockBuilder.declarePosition();
+    } while (currentTime < currentEndTime && !timeSelector.isEmpty());
     return tsBlockBuilder.build();
   }
 
@@ -242,6 +244,10 @@ public class TimeJoinOperator implements ProcessOperator {
     return finished;
   }
 
+  private void updateTimeSelector(int index) {
+    timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
+  }
+
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
    * return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index 6775c66447..eb185e09ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
index 95b7316844..96d9f86f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class AscTimeComparator implements TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
index 8df6c53c55..7a00f2adf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -38,7 +38,8 @@ public interface ColumnMerger {
   }
 
   /**
-   * merge columns belonging to same series into one column
+   * merge columns belonging to same series into one column, merge until each input column's time is
+   * larger than currentEndTime
    *
    * @param inputTsBlocks all source TsBlocks, some of which will contain source column
    * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
@@ -57,4 +58,23 @@ public interface ColumnMerger {
       TimeColumnBuilder timeBuilder,
       long currentEndTime,
       ColumnBuilder columnBuilder);
+
+  /**
+   * merge columns belonging to same series into one column, merge just one row whose time is equal
+   * to currentTime
+   *
+   * @param inputTsBlocks all source TsBlocks, some of which will contain source column
+   * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
+   *     we should only read from this array and not update it because others will use the start
+   *     index value in inputIndex array
+   * @param updatedInputIndex current index for each source TsBlock after merging
+   * @param currentTime merge just one row whose time is equal to currentTime
+   * @param columnBuilder used to write merged value into
+   */
+  void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
index f53c97d8fe..67c0112dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class DescTimeComparator implements TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
index ef9f7ceb00..79a1367ca7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -93,4 +93,58 @@ public class MultiColumnMerger implements ColumnMerger {
       }
     }
   }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+
+    // init startIndex for each input locations
+    for (InputLocation inputLocation : inputLocations) {
+      int tsBlockIndex = inputLocation.getTsBlockIndex();
+      updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
+    }
+
+    // record whether current row already has value to be appended
+    boolean appendValue = false;
+    // we don't use MinHeap here to choose the right column, because inputLocations.size() won't
+    // be very large.
+    // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
+    // than PriorityQueue.
+    for (InputLocation location : inputLocations) {
+      int tsBlockIndex = location.getTsBlockIndex();
+      int columnIndex = location.getValueColumnIndex();
+      int index = updatedInputIndex[tsBlockIndex];
+
+      // current location's input column is not empty
+      if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+        TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
+        Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+        // time of current location's input column is equal to current row's time
+        if (timeColumn.getLong(index) == currentTime) {
+          // value of current location's input column is not null
+          if (!valueColumn.isNull(index)) {
+            columnBuilder.write(valueColumn, index);
+            appendValue = true;
+          }
+          // increase the index
+          index++;
+          // update the index after merging
+          updatedInputIndex[tsBlockIndex] = index;
+          // we can safely set appendValue to true and then break the loop, because these input
+          // columns' time is not overlapped
+          if (appendValue) {
+            break;
+          }
+        }
+      }
+    }
+    // all input columns are null at current row, so just append a null
+    if (!appendValue) {
+      columnBuilder.appendNull();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
index a22c31faf5..c338130b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn;
-
 /** has more than one input column, but these columns' time is not overlapped */
 public class NonOverlappedMultiColumnMerger implements ColumnMerger {
 
@@ -62,7 +60,7 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
     // move to next InputLocation if current InputLocation's column has been consumed up
     moveToNextIfNecessary(inputTsBlocks);
     // merge current column
-    mergeOneColumn(
+    SingleColumnMerger.mergeOneColumn(
         inputTsBlocks,
         inputIndex,
         updatedInputIndex,
@@ -73,6 +71,25 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
         comparator);
   }
 
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+    // move to next InputLocation if current InputLocation's column has been consumed up
+    moveToNextIfNecessary(inputTsBlocks);
+    // merge current column
+    SingleColumnMerger.mergeOneColumn(
+        inputTsBlocks,
+        inputIndex,
+        updatedInputIndex,
+        currentTime,
+        columnBuilder,
+        inputLocations.get(index));
+  }
+
   private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) {
     // if it is already at the last index, don't need to move
     if (index == inputLocations.size() - 1) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
index f621f59d99..0c9f7e60e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -110,4 +110,46 @@ public class SingleColumnMerger implements ColumnMerger {
     // update the index after merging
     updatedInputIndex[tsBlockIndex] = index;
   }
+
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+    mergeOneColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location);
+  }
+
+  public static void mergeOneColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder,
+      InputLocation location) {
+    int tsBlockIndex = location.getTsBlockIndex();
+    int columnIndex = location.getValueColumnIndex();
+
+    int index = inputIndex[tsBlockIndex];
+    // input column is empty or current time of input column is already larger than currentEndTime
+    // just appendNull
+    if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
+        || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) {
+      columnBuilder.appendNull();
+    } else {
+      // read from input column and write it into columnBuilder
+      Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+
+      if (valueColumn.isNull(index)) {
+        columnBuilder.appendNull();
+      } else {
+        columnBuilder.write(valueColumn, index);
+      }
+      index++;
+    }
+    // update the index after merging
+    updatedInputIndex[tsBlockIndex] = index;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
index db017f6186..47eff4fe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public interface TimeComparator {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 099d3d5594..a981c9e631 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -68,19 +67,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePr
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
 import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
 import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
@@ -1146,7 +1147,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
 
     context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
-    return new TimeJoinOperator(
+    return new RowBasedTimeJoinOperator(
         operatorContext,
         children,
         node.getMergeOrder(),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index dc13943bef..c71db0a64d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -38,9 +38,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
index b45a11f668..e1a6bc5eb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
index 9aa09f18a5..18c00627e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
index 6625f296e6..32701ac2b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
index 16e03c71e7..4668fb67f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
index b84f779060..76216233a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index 9256c852c9..e05cad8041 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -31,9 +31,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 9b9b81a856..6a9902f24b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
index 3b12082c6e..f2043c281a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 6030229577..aabfdb7eb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -29,10 +29,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;