You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/01 07:19:56 UTC
[iotdb] 01/01: Improve the performance of Raw Query Without ValueFilter for nonAligned
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4006
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 102b86b7d0488eeb63231cff90caeb5b59e490bf
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon Aug 1 15:18:56 2022 +0800
Improve the performance of Raw Query Without ValueFilter for nonAligned
---
.../operator/process/DeviceMergeOperator.java | 2 +-
.../RowBasedTimeJoinOperator.java} | 62 ++++++++++++----------
.../process/{ => join}/TimeJoinOperator.java | 7 +--
.../{ => join}/merge/AscTimeComparator.java | 2 +-
.../process/{ => join}/merge/ColumnMerger.java | 24 ++++++++-
.../{ => join}/merge/DescTimeComparator.java | 2 +-
.../{ => join}/merge/MultiColumnMerger.java | 56 ++++++++++++++++++-
.../merge/NonOverlappedMultiColumnMerger.java | 25 +++++++--
.../{ => join}/merge/SingleColumnMerger.java | 44 ++++++++++++++-
.../process/{ => join}/merge/TimeComparator.java | 2 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 19 +++----
.../iotdb/db/mpp/execution/DataDriverTest.java | 6 +--
.../operator/AlignedSeriesScanOperatorTest.java | 8 +--
.../operator/DeviceMergeOperatorTest.java | 2 +-
.../mpp/execution/operator/LimitOperatorTest.java | 6 +--
.../execution/operator/MultiColumnMergerTest.java | 2 +-
.../NonOverlappedMultiColumnMergerTest.java | 4 +-
.../mpp/execution/operator/OffsetOperatorTest.java | 6 +--
.../operator/RawDataAggregationOperatorTest.java | 6 +--
.../execution/operator/SingleColumnMergerTest.java | 6 +--
.../execution/operator/TimeJoinOperatorTest.java | 8 +--
21 files changed, 220 insertions(+), 79 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index d16d47aa28..e0ccbe86ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
similarity index 85%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 6775c66447..f026ef35c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,7 +38,7 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.successfulAsList;
-public class TimeJoinOperator implements ProcessOperator {
+public class RowBasedTimeJoinOperator implements ProcessOperator {
private final OperatorContext operatorContext;
@@ -79,7 +80,7 @@ public class TimeJoinOperator implements ProcessOperator {
private final TimeComparator comparator;
- public TimeJoinOperator(
+ public RowBasedTimeJoinOperator(
OperatorContext operatorContext,
List<Operator> children,
Ordering mergeOrder,
@@ -139,10 +140,7 @@ public class TimeJoinOperator implements ProcessOperator {
inputIndex[i] = 0;
inputTsBlocks[i] = children.get(i).next();
if (!empty(i)) {
- int rowSize = inputTsBlocks[i].getPositionCount();
- for (int row = 0; row < rowSize; row++) {
- timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
- }
+ updateTimeSelector(i);
} else {
// child operator has next but return an empty TsBlock which means that it may not
// finish calculation in given time slice.
@@ -175,26 +173,30 @@ public class TimeJoinOperator implements ProcessOperator {
}
TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
- while (!timeSelector.isEmpty()
- && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) {
- timeBuilder.writeLong(timeSelector.pollFirst());
- tsBlockBuilder.declarePosition();
- }
-
- for (int i = 0; i < outputColumnCount; i++) {
- ColumnMerger merger = mergers.get(i);
- merger.mergeColumn(
- inputTsBlocks,
- inputIndex,
- shadowInputIndex,
- timeBuilder,
- currentEndTime,
- tsBlockBuilder.getColumnBuilder(i));
- }
-
- // update inputIndex using shadowInputIndex
- System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
+ long currentTime;
+ do {
+ currentTime = timeSelector.pollFirst();
+ timeBuilder.writeLong(currentTime);
+ for (int i = 0; i < outputColumnCount; i++) {
+ ColumnMerger merger = mergers.get(i);
+ merger.mergeColumn(
+ inputTsBlocks,
+ inputIndex,
+ shadowInputIndex,
+ currentTime,
+ tsBlockBuilder.getColumnBuilder(i));
+ }
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (inputIndex[i] != shadowInputIndex[i]) {
+ inputIndex[i] = shadowInputIndex[i];
+ if (!empty(i)) {
+ updateTimeSelector(i);
+ }
+ }
+ }
+ tsBlockBuilder.declarePosition();
+ } while (currentTime < currentEndTime && !timeSelector.isEmpty());
return tsBlockBuilder.build();
}
@@ -242,6 +244,10 @@ public class TimeJoinOperator implements ProcessOperator {
return finished;
}
+ private void updateTimeSelector(int index) {
+ timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
+ }
+
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock, return true; else
* return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
index 6775c66447..eb185e09ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.join;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
index 95b7316844..96d9f86f59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public class AscTimeComparator implements TimeComparator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
index 8df6c53c55..7a00f2adf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/ColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -38,7 +38,8 @@ public interface ColumnMerger {
}
/**
- * merge columns belonging to same series into one column
+ * merge columns belonging to same series into one column, merge until each input column's time is
+ * larger than currentEndTime
*
* @param inputTsBlocks all source TsBlocks, some of which will contain source column
* @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
@@ -57,4 +58,23 @@ public interface ColumnMerger {
TimeColumnBuilder timeBuilder,
long currentEndTime,
ColumnBuilder columnBuilder);
+
+ /**
+ * merge columns belonging to same series into one column, merge just one row whose time is equal
+ * to currentTime
+ *
+ * @param inputTsBlocks all source TsBlocks, some of which will contain source column
+ * @param inputIndex start index for each source TsBlock and size of it is equal to inputTsBlocks,
+ * we should only read from this array and not update it because others will use the start
+ * index value in inputIndex array
+ * @param updatedInputIndex current index for each source TsBlock after merging
+ * @param currentTime merge just one row whose time is equal to currentTime
+ * @param columnBuilder used to write merged value into
+ */
+ void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
index f53c97d8fe..67c0112dab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public class DescTimeComparator implements TimeComparator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
index ef9f7ceb00..79a1367ca7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/MultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -93,4 +93,58 @@ public class MultiColumnMerger implements ColumnMerger {
}
}
}
+
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder) {
+
+ // init startIndex for each input locations
+ for (InputLocation inputLocation : inputLocations) {
+ int tsBlockIndex = inputLocation.getTsBlockIndex();
+ updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
+ }
+
+ // record whether current row already has value to be appended
+ boolean appendValue = false;
+ // we don't use MinHeap here to choose the right column, because inputLocations.size() won't
+ // be very large.
+ // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
+ // than PriorityQueue.
+ for (InputLocation location : inputLocations) {
+ int tsBlockIndex = location.getTsBlockIndex();
+ int columnIndex = location.getValueColumnIndex();
+ int index = updatedInputIndex[tsBlockIndex];
+
+ // current location's input column is not empty
+ if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
+ TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
+ Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+ // time of current location's input column is equal to current row's time
+ if (timeColumn.getLong(index) == currentTime) {
+ // value of current location's input column is not null
+ if (!valueColumn.isNull(index)) {
+ columnBuilder.write(valueColumn, index);
+ appendValue = true;
+ }
+ // increase the index
+ index++;
+ // update the index after merging
+ updatedInputIndex[tsBlockIndex] = index;
+ // we can safely set appendValue to true and then break the loop, because these input
+ // columns' time is not overlapped
+ if (appendValue) {
+ break;
+ }
+ }
+ }
+ }
+ // all input columns are null at current row, so just append a null
+ if (!appendValue) {
+ columnBuilder.appendNull();
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
index a22c31faf5..c338130b8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/NonOverlappedMultiColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -25,8 +25,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-import static org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger.mergeOneColumn;
-
/** has more than one input column, but these columns' time is not overlapped */
public class NonOverlappedMultiColumnMerger implements ColumnMerger {
@@ -62,7 +60,7 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
// move to next InputLocation if current InputLocation's column has been consumed up
moveToNextIfNecessary(inputTsBlocks);
// merge current column
- mergeOneColumn(
+ SingleColumnMerger.mergeOneColumn(
inputTsBlocks,
inputIndex,
updatedInputIndex,
@@ -73,6 +71,25 @@ public class NonOverlappedMultiColumnMerger implements ColumnMerger {
comparator);
}
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder) {
+ // move to next InputLocation if current InputLocation's column has been consumed up
+ moveToNextIfNecessary(inputTsBlocks);
+ // merge current column
+ SingleColumnMerger.mergeOneColumn(
+ inputTsBlocks,
+ inputIndex,
+ updatedInputIndex,
+ currentTime,
+ columnBuilder,
+ inputLocations.get(index));
+ }
+
private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) {
// if it is already at the last index, don't need to move
if (index == inputLocations.size() - 1) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
index f621f59d99..0c9f7e60e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/SingleColumnMerger.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -110,4 +110,46 @@ public class SingleColumnMerger implements ColumnMerger {
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
}
+
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder) {
+ mergeOneColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location);
+ }
+
+ public static void mergeOneColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder,
+ InputLocation location) {
+ int tsBlockIndex = location.getTsBlockIndex();
+ int columnIndex = location.getValueColumnIndex();
+
+ int index = inputIndex[tsBlockIndex];
+ // input column is empty or current time of input column is already larger than currentEndTime
+ // just appendNull
+ if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
+ || inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) {
+ columnBuilder.appendNull();
+ } else {
+ // read from input column and write it into columnBuilder
+ Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
+
+ if (valueColumn.isNull(index)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(valueColumn, index);
+ }
+ index++;
+ }
+ // update the index after merging
+ updatedInputIndex[tsBlockIndex] = index;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
index db017f6186..47eff4fe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process.merge;
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public interface TimeComparator {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 099d3d5594..a981c9e631 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -68,19 +67,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.DoublePr
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.FloatPreviousFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPreviousFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryCollectOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator;
import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
@@ -1146,7 +1147,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new TimeJoinOperator(
+ return new RowBasedTimeJoinOperator(
operatorContext,
children,
node.getMergeOrder(),
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
index dc13943bef..c71db0a64d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/DataDriverTest.java
@@ -38,9 +38,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
index b45a11f668..e1a6bc5eb6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AlignedSeriesScanOperatorTest.java
@@ -30,10 +30,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
index 9aa09f18a5..18c00627e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/DeviceMergeOperatorTest.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
index 6625f296e6..32701ac2b0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LimitOperatorTest.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
index 16e03c71e7..4668fb67f6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MultiColumnMergerTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.MultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
index b84f779060..76216233a1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/NonOverlappedMultiColumnMergerTest.java
@@ -18,8 +18,8 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.NonOverlappedMultiColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
index 9256c852c9..e05cad8041 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OffsetOperatorTest.java
@@ -31,9 +31,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 9b9b81a856..6a9902f24b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -34,9 +34,9 @@ import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
index 3b12082c6e..f2043c281a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleColumnMergerTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.db.mpp.execution.operator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
index 6030229577..aabfdb7eb2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
@@ -29,10 +29,10 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;