You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 10:50:08 UTC

[iotdb] branch LastOperator updated (3496d28b86 -> c275c9b1db)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 3496d28b86 add test
     add ec0ca1044d fixed grafana connector sql inject (#5990)
     add b921f79749 add tokens in gitignore (#5971)
     add 107dbda6a1 [IOTDB-3250] Reactor UDF related services for mpp cluster (#5981)
     add 18638376c1 [IOTDB-3231] Return detailed failure message for opening a client (#5989)
     add acac1cfb18 Fix time iterator bug in MPP (#5992)
     add 383488a715 Distribution plan for Aggregation Query (#5982)
     new 516ea16764 Merge remote-tracking branch 'origin/master' into LastOperator
     new c275c9b1db fix ut

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .gitignore                                         |   1 +
 .../resources/conf/iotdb-confignode.properties     |  21 +
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  12 +
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |   1 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +
 .../controller/DatabaseConnectController.java      |   2 +-
 .../iotdb/web/grafana/dao/impl/BasicDaoImpl.java   |   2 +-
 .../IoTDBSyntaxConventionStringLiteralIT.java      |   4 +-
 .../iotdb/db/integration/IoTDBUDFManagementIT.java |  12 +-
 .../commons/udf/api/exception/UDFException.java    |   2 +-
 .../api}/exception/UDFRegistrationException.java   |   6 +-
 .../udf/builtin/BuiltinAggregationFunction.java    |  59 +++
 ...va => BuiltinTimeSeriesGeneratingFunction.java} |   4 +-
 .../iotdb/commons}/udf/service/UDFClassLoader.java |   4 +-
 .../udf/service/UDFClassLoaderManager.java         |  29 +-
 .../iotdb/commons}/udf/service/UDFLogWriter.java   |   2 +-
 .../udf/service/UDFRegistrationInformation.java    |   2 +-
 .../udf/service/UDFRegistrationService.java        |  90 ++---
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  11 +-
 .../timerangeiterator/AggrWindowIterator.java      |   9 +-
 .../timerangeiterator/PreAggrWindowIterator.java   |   9 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |   9 +-
 .../SingleTimeWindowIterator.java                  |  12 +-
 .../operator/process/TransformOperator.java        |   4 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  18 +
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  12 +-
 .../mpp/plan/analyze/GroupByLevelController.java   |  19 +-
 .../plan/expression/multi/FunctionExpression.java  |   8 +-
 .../db/mpp/plan/planner/DistributionPlanner.java   | 280 +++++++++++--
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  44 ++-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  20 +-
 .../planner/plan/node/process/AggregationNode.java |  21 +-
 .../plan/node/process/GroupByLevelNode.java        |  92 ++---
 .../{ProcessNode.java => MultiChildNode.java}      |  22 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  15 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  23 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  13 +-
 .../node/source/SeriesAggregationScanNode.java     |  38 +-
 ...eNode.java => SeriesAggregationSourceNode.java} |  26 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  13 +-
 .../SinkNode.java => source/SeriesSourceNode.java} |  15 +-
 .../plan/parameter/AggregationDescriptor.java      |  86 +++-
 .../planner/plan/parameter/AggregationStep.java    |   4 +
 .../plan/parameter/GroupByLevelDescriptor.java     | 107 +++++
 .../db/mpp/transformation/dag/udf/UDTFContext.java |   2 +-
 .../mpp/transformation/dag/udf/UDTFExecutor.java   |   2 +-
 .../transformation/dag/udf/UDTFTypeInferrer.java   |   2 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  31 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  32 +-
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |   4 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  16 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  16 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   9 +-
 .../operator/LastCacheScanOperatorTest.java        |   6 +-
 .../operator/LastQueryMergeOperatorTest.java       | 166 +++++++-
 .../SeriesAggregationScanOperatorTest.java         |   1 -
 .../operator/UpdateLastCacheOperatorTest.java      |  23 +-
 .../plan/analyze/AggregationDescriptorTest.java    | 238 +++++++++++
 .../db/mpp/plan/plan/DistributionPlannerTest.java  | 438 ++++++++++++++++++++-
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     | 134 ++-----
 .../node/process/GroupByLevelNodeSerdeTest.java    |   7 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   5 +-
 .../iotdb/rpc/ConfigNodeConnectionException.java   |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 65 files changed, 1853 insertions(+), 469 deletions(-)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons/udf/api}/exception/UDFRegistrationException.java (85%)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
 rename node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/{BuiltinFunction.java => BuiltinTimeSeriesGeneratingFunction.java} (96%)
 rename {server/src/main/java/org/apache/iotdb/db/query => node-commons/src/main/java/org/apache/iotdb/commons}/udf/service/UDFClassLoader.java (95%)
 rename {server/src/main/java/org/apache/iotdb/db/query => node-commons/src/main/java/org/apache/iotdb/commons}/udf/service/UDFClassLoaderManager.java (85%)
 rename {server/src/main/java/org/apache/iotdb/db/query => node-commons/src/main/java/org/apache/iotdb/commons}/udf/service/UDFLogWriter.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db/query => node-commons/src/main/java/org/apache/iotdb/commons}/udf/service/UDFRegistrationInformation.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db/query => node-commons/src/main/java/org/apache/iotdb/commons}/udf/service/UDFRegistrationService.java (84%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/{ProcessNode.java => MultiChildNode.java} (66%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/{SourceNode.java => SeriesAggregationSourceNode.java} (53%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/{sink/SinkNode.java => source/SeriesSourceNode.java} (70%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByLevelDescriptor.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AggregationDescriptorTest.java


[iotdb] 01/02: Merge remote-tracking branch 'origin/master' into LastOperator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 516ea16764bbd31f66afbfa24b33779f8ad87693
Merge: 3496d28b86 383488a715
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 18:04:17 2022 +0800

    Merge remote-tracking branch 'origin/master' into LastOperator

 .gitignore                                         |   1 +
 .../resources/conf/iotdb-confignode.properties     |  21 +
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  12 +
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |   1 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   2 +
 .../controller/DatabaseConnectController.java      |   2 +-
 .../iotdb/web/grafana/dao/impl/BasicDaoImpl.java   |   2 +-
 .../IoTDBSyntaxConventionStringLiteralIT.java      |   4 +-
 .../iotdb/db/integration/IoTDBUDFManagementIT.java |  12 +-
 .../commons/udf/api/exception/UDFException.java    |   2 +-
 .../api}/exception/UDFRegistrationException.java   |   6 +-
 .../udf/builtin/BuiltinAggregationFunction.java    |  59 +++
 ...va => BuiltinTimeSeriesGeneratingFunction.java} |   4 +-
 .../iotdb/commons}/udf/service/UDFClassLoader.java |   4 +-
 .../udf/service/UDFClassLoaderManager.java         |  29 +-
 .../iotdb/commons}/udf/service/UDFLogWriter.java   |   2 +-
 .../udf/service/UDFRegistrationInformation.java    |   2 +-
 .../udf/service/UDFRegistrationService.java        |  90 ++---
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  11 +-
 .../timerangeiterator/AggrWindowIterator.java      |   9 +-
 .../timerangeiterator/PreAggrWindowIterator.java   |   9 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |   9 +-
 .../SingleTimeWindowIterator.java                  |  12 +-
 .../operator/process/TransformOperator.java        |   4 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  18 +
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  12 +-
 .../mpp/plan/analyze/GroupByLevelController.java   |  19 +-
 .../plan/expression/multi/FunctionExpression.java  |   8 +-
 .../db/mpp/plan/planner/DistributionPlanner.java   | 280 +++++++++++--
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  44 ++-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  20 +-
 .../planner/plan/node/process/AggregationNode.java |  21 +-
 .../plan/node/process/GroupByLevelNode.java        |  92 ++---
 .../planner/plan/node/process/MultiChildNode.java} |  27 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  15 +-
 .../source/AlignedSeriesAggregationScanNode.java   |  23 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  13 +-
 .../node/source/SeriesAggregationScanNode.java     |  38 +-
 .../node/source/SeriesAggregationSourceNode.java   |  46 +++
 .../planner/plan/node/source/SeriesScanNode.java   |  13 +-
 .../plan/node/source/SeriesSourceNode.java}        |  20 +-
 .../plan/parameter/AggregationDescriptor.java      |  86 +++-
 .../planner/plan/parameter/AggregationStep.java    |   4 +
 .../plan/parameter/GroupByLevelDescriptor.java     | 107 +++++
 .../db/mpp/transformation/dag/udf/UDTFContext.java |   2 +-
 .../mpp/transformation/dag/udf/UDTFExecutor.java   |   2 +-
 .../transformation/dag/udf/UDTFTypeInferrer.java   |   2 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  31 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  32 +-
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |   4 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  16 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  16 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   9 +-
 .../plan/analyze/AggregationDescriptorTest.java    | 238 +++++++++++
 .../db/mpp/plan/plan/DistributionPlannerTest.java  | 438 ++++++++++++++++++++-
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     | 134 ++-----
 .../node/process/GroupByLevelNodeSerdeTest.java    |   7 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   5 +-
 .../iotdb/rpc/ConfigNodeConnectionException.java   |   2 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 61 files changed, 1702 insertions(+), 454 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 5844f784d6,736bc5feef..9e7d99cae9
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@@ -99,9 -93,8 +99,9 @@@ import org.apache.iotdb.db.mpp.executio
  import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
  import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
  import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
- import org.apache.iotdb.db.mpp.plan.expression.Expression;
+ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
  import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
  import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildNodesSchemaScanNode;
  import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.ChildPathsSchemaScanNode;


[iotdb] 02/02: fix ut

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c275c9b1db8e8229715e48082cd24683ae787fde
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 18:49:59 2022 +0800

    fix ut
---
 .../operator/LastCacheScanOperatorTest.java        |   6 +-
 .../operator/LastQueryMergeOperatorTest.java       | 166 ++++++++++++++++++++-
 .../SeriesAggregationScanOperatorTest.java         |   1 -
 .../operator/UpdateLastCacheOperatorTest.java      |  23 ++-
 4 files changed, 180 insertions(+), 16 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
index 4c9bd25876..72ee86fb65 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastCacheScanOperatorTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
 import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
@@ -56,7 +57,6 @@ public class LastCacheScanOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, SeriesScanOperator.class.getSimpleName());
 
-
       TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
 
       LastQueryUtil.appendLastValue(builder, 1, "root.sg.d.s1", "true", "BOOLEAN");
@@ -68,7 +68,9 @@ public class LastCacheScanOperatorTest {
 
       TsBlock tsBlock = builder.build();
 
-      LastCacheScanOperator lastCacheScanOperator = new LastCacheScanOperator(fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0), planNodeId1, tsBlock);
 
       assertTrue(lastCacheScanOperator.isBlocked().isDone());
       assertTrue(lastCacheScanOperator.hasNext());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
index 49f4e55a26..03fa1a1584 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LastQueryMergeOperatorTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -34,13 +32,18 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LastQueryMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
+import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -115,7 +118,6 @@ public class LastQueryMergeOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           5, planNodeId5, LastQueryMergeOperator.class.getSimpleName());
 
-
       SeriesAggregationScanOperator seriesAggregationScanOperator1 =
           new SeriesAggregationScanOperator(
               planNodeId1,
@@ -129,7 +131,14 @@ public class LastQueryMergeOperatorTest {
       seriesAggregationScanOperator1.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
-      UpdateLastCacheOperator updateLastCacheOperator1 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator1, measurementPath1, measurementPath1.getSeriesType(), null, false);
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
 
       SeriesAggregationScanOperator seriesAggregationScanOperator2 =
           new SeriesAggregationScanOperator(
@@ -144,27 +153,41 @@ public class LastQueryMergeOperatorTest {
       seriesAggregationScanOperator2.initQueryDataSource(
           new QueryDataSource(seqResources, unSeqResources));
 
-      UpdateLastCacheOperator updateLastCacheOperator2 =  new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(3), seriesAggregationScanOperator2, measurementPath2, measurementPath2.getSeriesType(), null, false);
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
 
-      LastQueryMergeOperator lastQueryMergeOperator = new LastQueryMergeOperator(fragmentInstanceContext.getOperatorContexts().get(4), ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4),
+              ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2));
 
       int count = 0;
       while (!lastQueryMergeOperator.isFinished()) {
         assertTrue(lastQueryMergeOperator.isBlocked().isDone());
         assertTrue(lastQueryMergeOperator.hasNext());
         TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
         assertEquals(3, result.getValueColumnCount());
 
         for (int i = 0; i < result.getPositionCount(); i++) {
           assertEquals(499, result.getTimeByIndex(i));
-          assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count, result.getColumn(0).getBinary(i).toString());
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
           assertEquals("10499", result.getColumn(1).getBinary(i).toString());
           assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
           count++;
         }
       }
 
-
     } catch (IllegalPathException e) {
       e.printStackTrace();
       fail();
@@ -173,5 +196,132 @@ public class LastQueryMergeOperatorTest {
 
   @Test
   public void testUpdateLastCacheOperatorTestWithCachedValue() {
+    try {
+      List<Aggregator> aggregators1 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+      List<Aggregator> aggregators2 = LastQueryUtil.createAggregators(TSDataType.INT32);
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+      Set<String> allSensors = Sets.newHashSet("sensor0", "sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      fragmentInstanceContext.addOperatorContext(
+          2, planNodeId2, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId3 = new PlanNodeId("3");
+      fragmentInstanceContext.addOperatorContext(
+          3, planNodeId3, SeriesAggregationScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId4 = new PlanNodeId("4");
+      fragmentInstanceContext.addOperatorContext(
+          4, planNodeId4, UpdateLastCacheOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId5 = new PlanNodeId("5");
+      fragmentInstanceContext.addOperatorContext(
+          5, planNodeId4, LastCacheScanOperator.class.getSimpleName());
+
+      PlanNodeId planNodeId6 = new PlanNodeId("6");
+      fragmentInstanceContext.addOperatorContext(
+          6, planNodeId6, LastQueryMergeOperator.class.getSimpleName());
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator1 =
+          new SeriesAggregationScanOperator(
+              planNodeId1,
+              measurementPath1,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              aggregators1,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator1.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator1 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(1),
+              seriesAggregationScanOperator1,
+              measurementPath1,
+              measurementPath1.getSeriesType(),
+              null,
+              false);
+
+      SeriesAggregationScanOperator seriesAggregationScanOperator2 =
+          new SeriesAggregationScanOperator(
+              planNodeId3,
+              measurementPath2,
+              allSensors,
+              fragmentInstanceContext.getOperatorContexts().get(2),
+              aggregators2,
+              null,
+              false,
+              null);
+      seriesAggregationScanOperator2.initQueryDataSource(
+          new QueryDataSource(seqResources, unSeqResources));
+
+      UpdateLastCacheOperator updateLastCacheOperator2 =
+          new UpdateLastCacheOperator(
+              fragmentInstanceContext.getOperatorContexts().get(3),
+              seriesAggregationScanOperator2,
+              measurementPath2,
+              measurementPath2.getSeriesType(),
+              null,
+              false);
+
+      TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(6);
+
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor2", "10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor3", "10499", "INT32");
+      LastQueryUtil.appendLastValue(
+          builder, 499, SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor4", "10499", "INT32");
+
+      TsBlock tsBlock = builder.build();
+
+      LastCacheScanOperator lastCacheScanOperator =
+          new LastCacheScanOperator(
+              fragmentInstanceContext.getOperatorContexts().get(4), planNodeId5, tsBlock);
+
+      LastQueryMergeOperator lastQueryMergeOperator =
+          new LastQueryMergeOperator(
+              fragmentInstanceContext.getOperatorContexts().get(5),
+              ImmutableList.of(
+                  updateLastCacheOperator1, updateLastCacheOperator2, lastCacheScanOperator));
+
+      int count = 0;
+      while (!lastQueryMergeOperator.isFinished()) {
+        assertTrue(lastQueryMergeOperator.isBlocked().isDone());
+        assertTrue(lastQueryMergeOperator.hasNext());
+        TsBlock result = lastQueryMergeOperator.next();
+        if (result == null) {
+          continue;
+        }
+        assertEquals(3, result.getValueColumnCount());
+
+        for (int i = 0; i < result.getPositionCount(); i++) {
+          assertEquals(499, result.getTimeByIndex(i));
+          assertEquals(
+              SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor" + count,
+              result.getColumn(0).getBinary(i).toString());
+          assertEquals("10499", result.getColumn(1).getBinary(i).toString());
+          assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(i).toString());
+          count++;
+        }
+      }
+
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
index c0c1adec30..7e95c4ac2c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SeriesAggregationScanOperatorTest.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
index a6af1d1f34..b78cb7b1ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/UpdateLastCacheOperatorTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator;
 
-import com.google.common.collect.Sets;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -42,6 +41,8 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,7 +97,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(499, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("10499", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -124,7 +127,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(499, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("10499", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -152,7 +157,9 @@ public class UpdateLastCacheOperatorTest {
       assertEquals(3, result.getValueColumnCount());
 
       assertEquals(120, result.getTimeByIndex(0));
-      assertEquals(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", result.getColumn(0).getBinary(0).toString());
+      assertEquals(
+          SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0",
+          result.getColumn(0).getBinary(0).toString());
       assertEquals("20120", result.getColumn(1).getBinary(0).toString());
       assertEquals(TSDataType.INT32.name(), result.getColumn(2).getBinary(0).toString());
 
@@ -201,6 +208,12 @@ public class UpdateLastCacheOperatorTest {
     seriesAggregationScanOperator.initQueryDataSource(
         new QueryDataSource(seqResources, unSeqResources));
 
-    return new UpdateLastCacheOperator(fragmentInstanceContext.getOperatorContexts().get(1), seriesAggregationScanOperator, measurementPath, measurementPath.getSeriesType(), null, false);
+    return new UpdateLastCacheOperator(
+        fragmentInstanceContext.getOperatorContexts().get(1),
+        seriesAggregationScanOperator,
+        measurementPath,
+        measurementPath.getSeriesType(),
+        null,
+        false);
   }
 }