You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/23 01:59:21 UTC

[iotdb] branch LastNode created (now 1e682b674a)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch LastNode
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 1e682b674a [IOTDB-3244] Add PlanNodes related to last query

This branch includes the following new commits:

     new 1e682b674a [IOTDB-3244] Add PlanNodes related to last query

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3244] Add PlanNodes related to last query

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch LastNode
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1e682b674a1d3f219844273af536080dac4510f1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Mon May 23 09:58:47 2022 +0800

    [IOTDB-3244] Add PlanNodes related to last query
---
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |  14 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  15 ++
 .../plan/node/process/LastQueryMergeNode.java      | 110 +++++++++++++
 .../plan/node/source/AlignedLastQueryScanNode.java | 171 ++++++++++++++++++++
 .../source/AlignedSeriesAggregationScanNode.java   |   6 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  13 +-
 .../plan/node/source/LastQueryScanNode.java        | 173 +++++++++++++++++++++
 .../node/source/SeriesAggregationScanNode.java     |   6 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  13 +-
 9 files changed, 486 insertions(+), 35 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 5daa489ea9..f1c63c06dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -50,8 +51,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -107,7 +110,10 @@ public enum PlanNodeType {
   CREATE_MULTI_TIME_SERIES((short) 39),
   CHILD_PATHS_SCAN((short) 40),
   CHILD_NODES_SCAN((short) 41),
-  NODE_MANAGEMENT_MEMORY_MERGE((short) 42);
+  NODE_MANAGEMENT_MEMORY_MERGE((short) 42),
+  LAST_QUERY_SCAN((short) 43),
+  ALIGNED_LAST_QUERY_SCAN((short) 44),
+  LAST_QUERY_MERGE((short) 45);
 
   private final short nodeType;
 
@@ -217,6 +223,12 @@ public enum PlanNodeType {
         return ChildNodesSchemaScanNode.deserialize(buffer);
       case 42:
         return NodeManagementMemoryMergeNode.deserialize(buffer);
+      case 43:
+        return LastQueryScanNode.deserialize(buffer);
+      case 44:
+        return AlignedLastQueryScanNode.deserialize(buffer);
+      case 45:
+        return LastQueryMergeNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 53132f3d0c..d60df01d36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNullNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTimeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
@@ -51,8 +52,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -240,4 +243,16 @@ public abstract class PlanVisitor<R, C> {
   public R visitNodeManagementMemoryMerge(NodeManagementMemoryMergeNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitLastQueryScan(LastQueryScanNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitLastQueryMerge(LastQueryMergeNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
new file mode 100644
index 0000000000..e8223a42d0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LastQueryMergeNode.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+
+public class LastQueryMergeNode extends ProcessNode {
+
+  // make sure child in list has been ordered by their sensor name
+  private List<PlanNode> children;
+
+  public LastQueryMergeNode(PlanNodeId id) {
+    super(id);
+    this.children = new ArrayList<>();
+  }
+
+  public LastQueryMergeNode(PlanNodeId id, List<PlanNode> children) {
+    super(id);
+    this.children = children;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return children;
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    children.add(child);
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new LastQueryMergeNode(getPlanNodeId());
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return CHILD_COUNT_NO_LIMIT;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return LAST_QUERY_COLUMN_HEADERS;
+  }
+
+  @Override
+  public String toString() {
+    return "LastQueryMergeNode-" + this.getPlanNodeId();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    LastQueryMergeNode that = (LastQueryMergeNode) o;
+    return Objects.equals(children, that.children);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), children);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLastQueryMerge(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.LAST_QUERY_MERGE.serialize(byteBuffer);
+  }
+
+  public static LastQueryMergeNode deserialize(ByteBuffer byteBuffer) {
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new LastQueryMergeNode(planNodeId);
+  }
+
+  public void setChildren(List<PlanNode> children) {
+    this.children = children;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
new file mode 100644
index 0000000000..837bdbace4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedLastQueryScanNode.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_COLUMN_HEADERS;
+
+public class AlignedLastQueryScanNode extends SourceNode {
+  // The path of the target series which will be scanned.
+  private final AlignedPath seriesPath;
+
+  private Filter timeFilter;
+
+  // The id of DataRegion where the node will run
+  private TRegionReplicaSet regionReplicaSet;
+
+  public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath, Filter timeFilter) {
+    super(id);
+    this.seriesPath = seriesPath;
+    this.timeFilter = timeFilter;
+  }
+
+  public AlignedLastQueryScanNode(
+      PlanNodeId id,
+      AlignedPath seriesPath,
+      Filter timeFilter,
+      TRegionReplicaSet regionReplicaSet) {
+    super(id);
+    this.seriesPath = seriesPath;
+    this.timeFilter = timeFilter;
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public void open() throws Exception {}
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("no child is allowed for SeriesScanNode");
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new AlignedLastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, regionReplicaSet);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return LAST_QUERY_COLUMN_HEADERS;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAlignedLastQueryScan(this, context);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o;
+    return Objects.equals(seriesPath, that.seriesPath)
+        && Objects.equals(timeFilter, that.timeFilter)
+        && Objects.equals(regionReplicaSet, that.regionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), seriesPath, timeFilter, regionReplicaSet);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "AlignedLastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+        this.getPlanNodeId(), this.getSeriesPath(), this.getRegionReplicaSet());
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer);
+    seriesPath.serialize(byteBuffer);
+    if (timeFilter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      timeFilter.serialize(byteBuffer);
+    }
+  }
+
+  public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) {
+    AlignedPath partialPath = (AlignedPath) PathDeserializeUtil.deserialize(byteBuffer);
+    Filter timeFilter = null;
+    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+      timeFilter = FilterFactory.deserialize(byteBuffer);
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new AlignedLastQueryScanNode(planNodeId, partialPath, timeFilter);
+  }
+
+  public AlignedPath getSeriesPath() {
+    return seriesPath;
+  }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 0345031d07..2706934e06 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -207,7 +206,6 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
       ReadWriteIOUtils.write((byte) 1, byteBuffer);
       groupByTimeParameter.serialize(byteBuffer);
     }
-    ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, byteBuffer);
   }
 
   public static AlignedSeriesAggregationScanNode deserialize(ByteBuffer byteBuffer) {
@@ -228,8 +226,6 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
     if (isNull == 1) {
       groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
     }
-    TRegionReplicaSet regionReplicaSet =
-        ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlignedSeriesAggregationScanNode(
         planNodeId,
@@ -238,7 +234,7 @@ public class AlignedSeriesAggregationScanNode extends SourceNode {
         scanOrder,
         timeFilter,
         groupByTimeParameter,
-        regionReplicaSet);
+        null);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 72c2a24244..6e4bec3d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -202,7 +201,6 @@ public class AlignedSeriesScanNode extends SourceNode {
     }
     ReadWriteIOUtils.write(limit, byteBuffer);
     ReadWriteIOUtils.write(offset, byteBuffer);
-    ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, byteBuffer);
   }
 
   public static AlignedSeriesScanNode deserialize(ByteBuffer byteBuffer) {
@@ -220,18 +218,9 @@ public class AlignedSeriesScanNode extends SourceNode {
     }
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
-    TRegionReplicaSet dataRegionReplicaSet =
-        ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlignedSeriesScanNode(
-        planNodeId,
-        alignedPath,
-        scanOrder,
-        timeFilter,
-        valueFilter,
-        limit,
-        offset,
-        dataRegionReplicaSet);
+        planNodeId, alignedPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
new file mode 100644
index 0000000000..655d546b24
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/LastQueryScanNode.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+
+public class LastQueryScanNode extends SourceNode {
+
+  public static final List<String> LAST_QUERY_COLUMN_HEADERS =
+      ImmutableList.of("SensorId", "Time", "Value", "DataType");
+
+  // The path of the target series which will be scanned.
+  private final MeasurementPath seriesPath;
+
+  private Filter timeFilter;
+
+  // The id of DataRegion where the node will run
+  private TRegionReplicaSet regionReplicaSet;
+
+  public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, Filter timeFilter) {
+    super(id);
+    this.seriesPath = seriesPath;
+    this.timeFilter = timeFilter;
+  }
+
+  public LastQueryScanNode(
+      PlanNodeId id,
+      MeasurementPath seriesPath,
+      Filter timeFilter,
+      TRegionReplicaSet regionReplicaSet) {
+    super(id);
+    this.seriesPath = seriesPath;
+    this.timeFilter = timeFilter;
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public void open() throws Exception {}
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  @Override
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public MeasurementPath getSeriesPath() {
+    return seriesPath;
+  }
+
+  @Nullable
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  public void setTimeFilter(@Nullable Filter timeFilter) {
+    this.timeFilter = timeFilter;
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("no child is allowed for SeriesScanNode");
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new LastQueryScanNode(getPlanNodeId(), seriesPath, timeFilter, regionReplicaSet);
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return LAST_QUERY_COLUMN_HEADERS;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitLastQueryScan(this, context);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
+    LastQueryScanNode that = (LastQueryScanNode) o;
+    return Objects.equals(seriesPath, that.seriesPath)
+        && Objects.equals(timeFilter, that.timeFilter)
+        && Objects.equals(regionReplicaSet, that.regionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), seriesPath, timeFilter, regionReplicaSet);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "LastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+        this.getPlanNodeId(), this.getSeriesPath(), this.getRegionReplicaSet());
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer);
+    seriesPath.serialize(byteBuffer);
+    if (timeFilter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      timeFilter.serialize(byteBuffer);
+    }
+  }
+
+  public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) {
+    MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer);
+    Filter timeFilter = null;
+    if (!ReadWriteIOUtils.readIsNull(byteBuffer)) {
+      timeFilter = FilterFactory.deserialize(byteBuffer);
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new LastQueryScanNode(planNodeId, partialPath, timeFilter);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index ced35c4fb4..817fc0a132 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -224,7 +223,6 @@ public class SeriesAggregationScanNode extends SourceNode {
       ReadWriteIOUtils.write((byte) 1, byteBuffer);
       groupByTimeParameter.serialize(byteBuffer);
     }
-    ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, byteBuffer);
   }
 
   public static SeriesAggregationScanNode deserialize(ByteBuffer byteBuffer) {
@@ -245,8 +243,6 @@ public class SeriesAggregationScanNode extends SourceNode {
     if (isNull == 1) {
       groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
     }
-    TRegionReplicaSet regionReplicaSet =
-        ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new SeriesAggregationScanNode(
         planNodeId,
@@ -255,7 +251,7 @@ public class SeriesAggregationScanNode extends SourceNode {
         scanOrder,
         timeFilter,
         groupByTimeParameter,
-        regionReplicaSet);
+        null);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
index 3b3fe8f05c..eb3eb1937a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -213,7 +212,6 @@ public class SeriesScanNode extends SourceNode {
     }
     ReadWriteIOUtils.write(limit, byteBuffer);
     ReadWriteIOUtils.write(offset, byteBuffer);
-    ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, byteBuffer);
   }
 
   public static SeriesScanNode deserialize(ByteBuffer byteBuffer) {
@@ -231,18 +229,9 @@ public class SeriesScanNode extends SourceNode {
     }
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
-    TRegionReplicaSet dataRegionReplicaSet =
-        ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new SeriesScanNode(
-        planNodeId,
-        partialPath,
-        scanOrder,
-        timeFilter,
-        valueFilter,
-        limit,
-        offset,
-        dataRegionReplicaSet);
+        planNodeId, partialPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
   }
 
   @Override