You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/22 08:55:28 UTC
[iotdb] 01/02: start
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-3883
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe98c3362fc7d815632ae2c00941d7a23c12959e
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Jul 21 14:58:16 2022 +0800
start
---
.../db/mpp/execution/operator/LastQueryUtil.java | 37 ++++
.../operator/process/ProcessOperator.java | 4 +-
.../process/{ => last}/LastQueryMergeOperator.java | 38 ++---
.../operator/process/last/LastQueryOperator.java | 137 +++++++++++++++
.../process/last/LastQuerySortOperator.java | 190 +++++++++++++++++++++
.../{ => last}/UpdateLastCacheOperator.java | 3 +-
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 8 +-
...peratorTest.java => LastQueryOperatorTest.java} | 34 ++--
.../operator/UpdateLastCacheOperatorTest.java | 2 +-
9 files changed, 407 insertions(+), 46 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
index 50e08472d3..408e5cd56c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryUtil.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
@@ -36,6 +37,7 @@ import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
public class LastQueryUtil {
@@ -66,6 +68,41 @@ public class LastQueryUtil {
builder.declarePosition();
}
+
+ public static void appendLastValue(TsBlockBuilder builder, TsBlock tsBlock) {
+ if (tsBlock.isEmpty()) {
+ return;
+ }
+ int size = tsBlock.getPositionCount();
+ for (int i = 0; i < size; i++) {
+ // Time
+ builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(i));
+ // timeseries
+ builder.getColumnBuilder(0).writeBinary(tsBlock.getColumn(0).getBinary(i));
+ // value
+ builder.getColumnBuilder(1).writeBinary(tsBlock.getColumn(1).getBinary(i));
+ // dataType
+ builder.getColumnBuilder(2).writeBinary(tsBlock.getColumn(2).getBinary(i));
+ builder.declarePosition();
+ }
+ }
+
+ public static void appendLastValue(TsBlockBuilder builder, TsBlock tsBlock, int index) {
+ // Time
+ builder.getTimeColumnBuilder().writeLong(tsBlock.getTimeByIndex(index));
+ // timeseries
+ builder.getColumnBuilder(0).writeBinary(tsBlock.getColumn(0).getBinary(index));
+ // value
+ builder.getColumnBuilder(1).writeBinary(tsBlock.getColumn(1).getBinary(index));
+ // dataType
+ builder.getColumnBuilder(2).writeBinary(tsBlock.getColumn(2).getBinary(index));
+ builder.declarePosition();
+ }
+
+ public static int compareTimeSeries(TsBlock a, int indexA, TsBlock b, int indexB, Comparator<Binary> comparator) {
+ return comparator.compare(a.getColumn(0).getBinary(indexA), b.getColumn(0).getBinary(indexB));
+ }
+
public static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
return filter == null || filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
index 6205cb766d..a6f64ff295 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/ProcessOperator.java
@@ -21,4 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
// TODO should think about what interfaces should this ProcessOperator have
-public interface ProcessOperator extends Operator {}
+public interface ProcessOperator extends Operator {
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
index 91aeecec02..d6998ef1af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LastQueryMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryMergeOperator.java
@@ -16,17 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import java.util.List;
+// merge all last query result from different data regions, it will select max time for the same time-series
public class LastQueryMergeOperator implements ProcessOperator {
private final OperatorContext operatorContext;
@@ -37,11 +39,14 @@ public class LastQueryMergeOperator implements ProcessOperator {
private int currentIndex;
+ private TsBlockBuilder tsBlockBuilder;
+
public LastQueryMergeOperator(OperatorContext operatorContext, List<Operator> children) {
this.operatorContext = operatorContext;
this.children = children;
this.inputOperatorsCount = children.size();
this.currentIndex = 0;
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
}
@Override
@@ -51,37 +56,26 @@ public class LastQueryMergeOperator implements ProcessOperator {
@Override
public ListenableFuture<?> isBlocked() {
- if (currentIndex < inputOperatorsCount) {
- return children.get(currentIndex).isBlocked();
- } else {
- return Futures.immediateVoidFuture();
- }
+ return ProcessOperator.super.isBlocked();
}
@Override
public TsBlock next() {
- if (children.get(currentIndex).hasNext()) {
- return children.get(currentIndex).next();
- } else {
- currentIndex++;
- return null;
- }
+ return null;
}
@Override
public boolean hasNext() {
- return currentIndex < inputOperatorsCount;
+ return false;
}
@Override
- public boolean isFinished() {
- return !hasNext();
+ public void close() throws Exception {
+ ProcessOperator.super.close();
}
@Override
- public void close() throws Exception {
- for (Operator child : children) {
- child.close();
- }
+ public boolean isFinished() {
+ return false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
new file mode 100644
index 0000000000..7152347293
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQueryOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+
+// collect all last query result in the same data region and there is no order guarantee
+public class LastQueryOperator implements ProcessOperator {
+
+ private static final int MAX_DETECT_COUNT = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+ private final OperatorContext operatorContext;
+
+ private final List<UpdateLastCacheOperator> children;
+
+ private final int inputOperatorsCount;
+
+ private int currentIndex;
+
+ private TsBlockBuilder tsBlockBuilder;
+
+
+ public LastQueryOperator(OperatorContext operatorContext, List<UpdateLastCacheOperator> children, TsBlockBuilder builder) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputOperatorsCount = children.size();
+ this.currentIndex = 0;
+ this.tsBlockBuilder = builder;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ if (currentIndex < inputOperatorsCount) {
+ int endIndex = getEndIndex();
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = currentIndex; i < endIndex; i++) {
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ listenableFutures.add(blocked);
+ }
+ }
+ return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+ } else {
+ return Futures.immediateVoidFuture();
+ }
+ }
+
+ @Override
+ public TsBlock next() {
+
+ // we have consumed up data from children Operator, just return all remaining cached data in tsBlockBuilder
+ if (currentIndex >= inputOperatorsCount) {
+ return tsBlockBuilder.build();
+ }
+
+ // start stopwatch
+ long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+ long start = System.nanoTime();
+
+ int endIndex = getEndIndex();
+
+ while ((System.nanoTime() - start < maxRuntime) && (currentIndex < endIndex) && !tsBlockBuilder.isFull()) {
+ if (children.get(currentIndex).hasNext()) {
+ TsBlock tsBlock = children.get(currentIndex).next();
+ if (tsBlock == null) {
+ return null;
+ } else if (!tsBlock.isEmpty()) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+ }
+ }
+ currentIndex++;
+ }
+
+ TsBlock res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < inputOperatorsCount || !tsBlockBuilder.isEmpty();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ tsBlockBuilder = null;
+ }
+
+ private int getEndIndex() {
+ return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
new file mode 100644
index 0000000000..458e2c70ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/LastQuerySortOperator.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
+
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.compareTimeSeries;
+
+// collect all last query result in the same data region and sort them according to the time-series's alphabetical order
+public class LastQuerySortOperator implements ProcessOperator {
+
+ private static final int MAX_DETECT_COUNT = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+
+ // we must make sure that data in cachedTsBlock has already been sorted
+ // values that have last cache
+ private TsBlock cachedTsBlock;
+
+ private final int cachedTsBlockSize;
+
+ // read index for cachedTsBlock
+ private int cachedTsBlockRowIndex;
+
+ // we must make sure that Operator in children has already been sorted
+ private final List<UpdateLastCacheOperator> children;
+
+ private final OperatorContext operatorContext;
+
+ private final int inputOperatorsCount;
+
+ private int currentIndex;
+
+ private final TsBlockBuilder tsBlockBuilder;
+
+ private final Comparator<Binary> timeSeriesComparator;
+
+ // used to cache previous TsBlock get from children
+ private TsBlock previousTsBlock;
+
+ public LastQuerySortOperator(OperatorContext operatorContext, TsBlock cachedTsBlock, List<UpdateLastCacheOperator> children, Comparator<Binary> timeSeriesComparator) {
+ this.cachedTsBlock = cachedTsBlock;
+ this.cachedTsBlockSize = cachedTsBlock.getPositionCount();
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputOperatorsCount = children.size();
+ this.currentIndex = 0;
+ this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder();
+ this.timeSeriesComparator = timeSeriesComparator;
+ this.previousTsBlock = null;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ if (currentIndex < inputOperatorsCount) {
+ int endIndex = getEndIndex();
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = currentIndex; i < endIndex; i++) {
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ listenableFutures.add(blocked);
+ }
+ }
+ return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+ } else {
+ return Futures.immediateVoidFuture();
+ }
+ }
+
+ @Override
+ public TsBlock next() {
+ // we have consumed up data from children Operator, just return all remaining cached data in cachedTsBlock, tsBlockBuilder and previousTsBlock
+ if (currentIndex >= inputOperatorsCount) {
+ while (previousTsBlock != null) {
+ if (canUseDataFromCachedTsBlock(previousTsBlock)) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+ } else {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+ previousTsBlock = null;
+ }
+ }
+ TsBlock res = cachedTsBlock.subTsBlock(cachedTsBlockRowIndex);
+ cachedTsBlockRowIndex = cachedTsBlockSize;
+ if (!tsBlockBuilder.isEmpty()) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, res);
+ res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ }
+ return res;
+ }
+
+
+ // start stopwatch
+ long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+ long start = System.nanoTime();
+
+ int endIndex = getEndIndex();
+
+ while ((System.nanoTime() - start < maxRuntime) && (currentIndex < endIndex || previousTsBlock != null) && !tsBlockBuilder.isFull()) {
+ if (previousTsBlock != null) {
+ if (canUseDataFromCachedTsBlock(previousTsBlock)) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+ } else {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, previousTsBlock, 0);
+ previousTsBlock = null;
+ }
+ } else {
+ if (children.get(currentIndex).hasNext()) {
+ TsBlock tsBlock = children.get(currentIndex).next();
+ if (tsBlock == null) {
+ return null;
+ } else if (!tsBlock.isEmpty()) {
+ if (canUseDataFromCachedTsBlock(tsBlock)) {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, cachedTsBlock, cachedTsBlockRowIndex++);
+ previousTsBlock = tsBlock;
+ } else {
+ LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock, 0);
+ }
+ }
+ }
+ currentIndex++;
+ }
+ }
+
+ TsBlock res = tsBlockBuilder.build();
+ tsBlockBuilder.reset();
+ return res;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return currentIndex < inputOperatorsCount || cachedTsBlockRowIndex < cachedTsBlockSize || !tsBlockBuilder.isEmpty() || previousTsBlock != null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator child : children) {
+ child.close();
+ }
+ cachedTsBlock = null;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !hasNext();
+ }
+
+ private int getEndIndex() {
+ return currentIndex + Math.min(MAX_DETECT_COUNT, inputOperatorsCount - currentIndex);
+ }
+
+ private boolean canUseDataFromCachedTsBlock(TsBlock tsBlock) {
+ return cachedTsBlockRowIndex < cachedTsBlockSize && compareTimeSeries(cachedTsBlock, cachedTsBlockRowIndex, tsBlock, 0, timeSeriesComparator) < 0;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
index 4088a13396..036295ace0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/UpdateLastCacheOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/last/UpdateLastCacheOperator.java
@@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.execution.operator.process;
+package org.apache.iotdb.db.mpp.execution.operator.process.last;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 704d49c5ea..b8aef10211 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -47,7 +47,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
@@ -56,7 +56,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -1371,9 +1371,9 @@ public class LocalExecutionPlanner {
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- LastQueryMergeOperator.class.getSimpleName());
+ LastQueryOperator.class.getSimpleName());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new LastQueryMergeOperator(operatorContext, operatorList);
+ return new LastQueryOperator(operatorContext, operatorList);
}
private Map<String, List<InputLocation>> makeLayout(PlanNode node) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
similarity index 93%
rename from server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
index c0ced72d91..5804095ffd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryOperatorTest.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -60,7 +60,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class LastQueryMergeOperatorTest {
+public class LastQueryOperatorTest {
private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.LastQueryMergeOperatorTest";
private final List<String> deviceIds = new ArrayList<>();
@@ -117,7 +117,7 @@ public class LastQueryMergeOperatorTest {
PlanNodeId planNodeId5 = new PlanNodeId("5");
fragmentInstanceContext.addOperatorContext(
- 5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
+ 5, planNodeId5, LastQueryOperator.class.getSimpleName());
fragmentInstanceContext
.getOperatorContexts()
@@ -170,16 +170,16 @@ public class LastQueryMergeOperatorTest {
null,
false);
- LastQueryMergeOperator lastQueryMergeOperator =
- new LastQueryMergeOperator(
+ LastQueryOperator lastQueryOperator =
+ new LastQueryOperator(
fragmentInstanceContext.getOperatorContexts().get(4),
ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
int count = 0;
- while (!lastQueryMergeOperator.isFinished()) {
- assertTrue(lastQueryMergeOperator.isBlocked().isDone());
- assertTrue(lastQueryMergeOperator.hasNext());
- TsBlock result = lastQueryMergeOperator.next();
+ while (!lastQueryOperator.isFinished()) {
+ assertTrue(lastQueryOperator.isBlocked().isDone());
+ assertTrue(lastQueryOperator.hasNext());
+ TsBlock result = lastQueryOperator.next();
if (result == null) {
continue;
}
@@ -239,7 +239,7 @@ public class LastQueryMergeOperatorTest {
PlanNodeId planNodeId6 = new PlanNodeId("6");
fragmentInstanceContext.addOperatorContext(
- 6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+ 6, planNodeId6, LastQueryOperator.class.getSimpleName());
fragmentInstanceContext
.getOperatorContexts()
@@ -307,17 +307,17 @@ public class LastQueryMergeOperatorTest {
new LastCacheScanOperator(
fragmentInstanceContext.getOperatorContexts().get(4), planNodeId5, tsBlock);
- LastQueryMergeOperator lastQueryMergeOperator =
- new LastQueryMergeOperator(
+ LastQueryOperator lastQueryOperator =
+ new LastQueryOperator(
fragmentInstanceContext.getOperatorContexts().get(5),
ImmutableList.of(
updateLastCacheOperator1, updateLastCacheOperator2, lastCacheScanOperator));
int count = 0;
- while (!lastQueryMergeOperator.isFinished()) {
- assertTrue(lastQueryMergeOperator.isBlocked().isDone());
- assertTrue(lastQueryMergeOperator.hasNext());
- TsBlock result = lastQueryMergeOperator.next();
+ while (!lastQueryOperator.isFinished()) {
+ assertTrue(lastQueryOperator.isBlocked().isDone());
+ assertTrue(lastQueryOperator.hasNext());
+ TsBlock result = lastQueryOperator.next();
if (result == null) {
continue;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index 581f1e703e..4567680edb 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;