You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/02/26 13:13:00 UTC

[iotdb] branch master updated: [IOTDB-4898] Push offset and limit down to ScanOperator if possible

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 415f68e36b [IOTDB-4898] Push offset and limit down to ScanOperator if possible
415f68e36b is described below

commit 415f68e36b88659b1e231be2b047483ea791337a
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Sun Feb 26 21:12:54 2023 +0800

    [IOTDB-4898] Push offset and limit down to ScanOperator if possible
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  12 +-
 .../traverser/TraverserWithLimitOffsetWrapper.java |   6 +-
 .../impl/read/AbstractShowSchemaPlanImpl.java      |  10 +-
 .../impl/read/SchemaRegionReadPlanFactory.java     |   6 +-
 .../impl/read/ShowDevicesPlanImpl.java             |   2 +-
 .../impl/read/ShowTimeSeriesPlanImpl.java          |   4 +-
 .../plan/schemaregion/read/IShowSchemaPlan.java    |   4 +-
 .../apache/iotdb/db/metadata/tag/TagManager.java   |   4 +-
 .../operator/schema/source/DeviceSchemaSource.java |   6 +-
 .../schema/source/SchemaSourceFactory.java         |   6 +-
 .../schema/source/TimeSeriesSchemaSource.java      |   8 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |   4 +
 .../plan/analyze/ColumnPaginationController.java   |  10 +-
 .../db/mpp/plan/analyze/ExpressionAnalyzer.java    |  34 +++
 .../memory/StatementMemorySourceVisitor.java       |   5 +-
 .../db/mpp/plan/expression/ExpressionFactory.java  |   4 +
 .../mpp/plan/optimization/LimitOffsetPushDown.java | 241 +++++++++++++++++
 .../db/mpp/plan/optimization/PlanOptimizer.java    |   3 +-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  16 +-
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  10 +-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |   8 +-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |   2 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   2 +
 .../planner/distribution/DistributionPlanner.java  |  23 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  16 ++
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  51 ++--
 .../node/metedata/read/DevicesSchemaScanNode.java  |   8 +-
 .../node/metedata/read/SchemaQueryScanNode.java    |  12 +-
 .../metedata/read/TimeSeriesSchemaScanNode.java    |   8 +-
 .../plan/planner/plan/node/process/LimitNode.java  |  10 +-
 .../plan/planner/plan/node/process/OffsetNode.java |  10 +-
 .../plan/node/source/AlignedSeriesScanNode.java    |  24 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  20 +-
 .../iotdb/db/mpp/plan/statement/Statement.java     |   2 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  37 ++-
 .../mpp/plan/statement/metadata/ShowStatement.java |  12 +-
 .../plan/statement/sys/ShowQueriesStatement.java   |  17 +-
 .../plan/optimization/LimitOffsetPushDownTest.java | 299 +++++++++++++++++++++
 .../db/mpp/plan/optimization/TestPlanBuilder.java  | 184 +++++++++++++
 .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java |   1 -
 40 files changed, 994 insertions(+), 147 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 7a8c008ef5..5f04e5406d 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -531,13 +531,17 @@ paginationClause
     ;
 
 rowPaginationClause
-    : limitClause offsetClause?
-    | offsetClause? limitClause
+    : limitClause
+    | offsetClause
+    | offsetClause limitClause
+    | limitClause offsetClause
     ;
 
 seriesPaginationClause
-    : slimitClause soffsetClause?
-    | soffsetClause? slimitClause
+    : slimitClause
+    | soffsetClause
+    | soffsetClause slimitClause
+    | slimitClause soffsetClause
     ;
 
 limitClause
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
index d357e951e4..b6fb42b6ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/TraverserWithLimitOffsetWrapper.java
@@ -26,14 +26,14 @@ import java.util.NoSuchElementException;
 
 public class TraverserWithLimitOffsetWrapper<R> extends Traverser<R> {
   private final Traverser<R> traverser;
-  private final int limit;
-  private final int offset;
+  private final long limit;
+  private final long offset;
   private final boolean hasLimit;
 
   private int count = 0;
   int curOffset = 0;
 
-  public TraverserWithLimitOffsetWrapper(Traverser<R> traverser, int limit, int offset) {
+  public TraverserWithLimitOffsetWrapper(Traverser<R> traverser, long limit, long offset) {
     this.traverser = traverser;
     this.limit = limit;
     this.offset = offset;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
index d0d05105d3..f81186c574 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/AbstractShowSchemaPlanImpl.java
@@ -27,8 +27,8 @@ import java.util.Objects;
 public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
 
   protected final PartialPath path;
-  protected final int limit;
-  protected final int offset;
+  protected final long limit;
+  protected final long offset;
   protected final boolean isPrefixMatch;
 
   protected AbstractShowSchemaPlanImpl(PartialPath path) {
@@ -38,7 +38,7 @@ public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
     this.isPrefixMatch = false;
   }
 
-  AbstractShowSchemaPlanImpl(PartialPath path, int limit, int offset, boolean isPrefixMatch) {
+  AbstractShowSchemaPlanImpl(PartialPath path, long limit, long offset, boolean isPrefixMatch) {
     this.path = path;
     this.limit = limit;
     this.offset = offset;
@@ -51,12 +51,12 @@ public abstract class AbstractShowSchemaPlanImpl implements IShowSchemaPlan {
   }
 
   @Override
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
   @Override
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
index 2afdb8b887..c024119fff 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/SchemaRegionReadPlanFactory.java
@@ -41,7 +41,7 @@ public class SchemaRegionReadPlanFactory {
   }
 
   public static IShowDevicesPlan getShowDevicesPlan(
-      PartialPath path, int limit, int offset, boolean isPrefixMatch) {
+      PartialPath path, long limit, long offset, boolean isPrefixMatch) {
     return new ShowDevicesPlanImpl(path, limit, offset, isPrefixMatch, -1);
   }
 
@@ -76,8 +76,8 @@ public class SchemaRegionReadPlanFactory {
       boolean isContains,
       String key,
       String value,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       boolean isPrefixMatch) {
     return new ShowTimeSeriesPlanImpl(
         path, relatedTemplate, isContains, key, value, limit, offset, isPrefixMatch);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
index f740f8e27b..e4153519fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java
@@ -31,7 +31,7 @@ public class ShowDevicesPlanImpl extends AbstractShowSchemaPlanImpl implements I
   private final int schemaTemplateId;
 
   ShowDevicesPlanImpl(
-      PartialPath path, int limit, int offset, boolean isPrefixMatch, int schemaTemplateId) {
+      PartialPath path, long limit, long offset, boolean isPrefixMatch, int schemaTemplateId) {
     super(path, limit, offset, isPrefixMatch);
     this.schemaTemplateId = schemaTemplateId;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
index 0906b83b34..be857877cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowTimeSeriesPlanImpl.java
@@ -42,8 +42,8 @@ public class ShowTimeSeriesPlanImpl extends AbstractShowSchemaPlanImpl
       boolean isContains,
       String key,
       String value,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       boolean isPrefixMatch) {
     super(path, limit, offset, isPrefixMatch);
     this.relatedTemplate = relatedTemplate;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
index fb28d91af7..d86e8f2857 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/read/IShowSchemaPlan.java
@@ -39,9 +39,9 @@ public interface IShowSchemaPlan extends ISchemaRegionPlan {
 
   PartialPath getPath();
 
-  int getLimit();
+  long getLimit();
 
-  int getOffset();
+  long getOffset();
 
   boolean isPrefixMatch();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
index ca1cc56990..206e0bb80c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java
@@ -206,8 +206,8 @@ public class TagManager {
     PartialPath pathPattern = plan.getPath();
     int curOffset = 0;
     int count = 0;
-    int limit = plan.getLimit();
-    int offset = plan.getOffset();
+    long limit = plan.getLimit();
+    long offset = plan.getOffset();
     boolean hasLimit = limit > 0 || offset > 0;
     while (curOffset < offset && allMatchedNodes.hasNext()) {
       IMeasurementMNode node = allMatchedNodes.next();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
index b606531369..2abebdfff9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/DeviceSchemaSource.java
@@ -37,13 +37,13 @@ public class DeviceSchemaSource implements ISchemaSource<IDeviceSchemaInfo> {
   private final PartialPath pathPattern;
   private final boolean isPrefixMatch;
 
-  private final int limit;
-  private final int offset;
+  private final long limit;
+  private final long offset;
 
   private final boolean hasSgCol;
 
   DeviceSchemaSource(
-      PartialPath pathPattern, boolean isPrefixPath, int limit, int offset, boolean hasSgCol) {
+      PartialPath pathPattern, boolean isPrefixPath, long limit, long offset, boolean hasSgCol) {
     this.pathPattern = pathPattern;
     this.isPrefixMatch = isPrefixPath;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
index c0c4305ea9..16e58d2a44 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/SchemaSourceFactory.java
@@ -46,8 +46,8 @@ public class SchemaSourceFactory {
   public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaSource(
       PartialPath pathPattern,
       boolean isPrefixMatch,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       String key,
       String value,
       boolean isContains,
@@ -62,7 +62,7 @@ public class SchemaSourceFactory {
   }
 
   public static ISchemaSource<IDeviceSchemaInfo> getDeviceSchemaSource(
-      PartialPath pathPattern, boolean isPrefixPath, int limit, int offset, boolean hasSgCol) {
+      PartialPath pathPattern, boolean isPrefixPath, long limit, long offset, boolean hasSgCol) {
     return new DeviceSchemaSource(pathPattern, isPrefixPath, limit, offset, hasSgCol);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
index 337aaacd0a..1ad90c5f80 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/source/TimeSeriesSchemaSource.java
@@ -41,8 +41,8 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
   private final PartialPath pathPattern;
   private final boolean isPrefixMatch;
 
-  private final int limit;
-  private final int offset;
+  private final long limit;
+  private final long offset;
 
   private final String key;
   private final String value;
@@ -53,8 +53,8 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
   TimeSeriesSchemaSource(
       PartialPath pathPattern,
       boolean isPrefixMatch,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       String key,
       String value,
       boolean isContains,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index 14ee67b535..bf15b2a826 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -582,4 +582,8 @@ public class Analysis {
   public void setVirtualSource(boolean virtualSource) {
     isVirtualSource = virtualSource;
   }
+
+  public Map<NodeRef<Expression>, TSDataType> getExpressionTypes() {
+    return expressionTypes;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
index 082428664d..730b79f7e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ColumnPaginationController.java
@@ -25,17 +25,17 @@ import org.apache.iotdb.db.exception.sql.PathNumOverLimitException;
 /** apply MaxQueryDeduplicatedPathNum and SLIMIT & SOFFSET */
 public class ColumnPaginationController {
 
-  private int curLimit =
-      IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1;
-  private int curOffset;
+  private long curLimit =
+      IoTDBDescriptor.getInstance().getConfig().getMaxQueryDeduplicatedPathNum() + 1L;
+  private long curOffset;
 
   // records the path number that the SchemaProcessor totally returned
-  private int consumed = 0;
+  private long consumed = 0;
 
   // for ALIGN BY DEVICE / DISABLE ALIGN / GROUP BY LEVEL / LAST, controller does is disabled
   private final boolean isDisabled;
 
-  public ColumnPaginationController(int seriesLimit, int seriesOffset, boolean isDisabled) {
+  public ColumnPaginationController(long seriesLimit, long seriesOffset, boolean isDisabled) {
     // for series limit, the default value is 0, which means no limit
     this.curLimit = seriesLimit == 0 ? this.curLimit : Math.min(seriesLimit, this.curLimit);
     // series offset for result set. The default value is 0
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
index 6f35ebfb60..2cbda77f77 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ExpressionAnalyzer.java
@@ -1342,4 +1342,38 @@ public class ExpressionAnalyzer {
           "unsupported expression type: " + expression.getExpressionType());
     }
   }
+
+  public static boolean checkIsScalarExpression(Expression expression, Analysis analysis) {
+    if (expression instanceof TernaryExpression) {
+      TernaryExpression ternaryExpression = (TernaryExpression) expression;
+      return checkIsScalarExpression(ternaryExpression.getFirstExpression(), analysis)
+          && checkIsScalarExpression(ternaryExpression.getSecondExpression(), analysis)
+          && checkIsScalarExpression(ternaryExpression.getThirdExpression(), analysis);
+    } else if (expression instanceof BinaryExpression) {
+      BinaryExpression binaryExpression = (BinaryExpression) expression;
+      return checkIsScalarExpression(binaryExpression.getLeftExpression(), analysis)
+          && checkIsScalarExpression(binaryExpression.getRightExpression(), analysis);
+    } else if (expression instanceof UnaryExpression) {
+      return checkIsScalarExpression(((UnaryExpression) expression).getExpression(), analysis);
+    } else if (expression instanceof FunctionExpression) {
+      FunctionExpression functionExpression = (FunctionExpression) expression;
+      if (!functionExpression.isMappable(analysis.getExpressionTypes())
+          || BuiltinFunction.DEVICE_VIEW_SPECIAL_PROCESS_FUNCTIONS.contains(
+              functionExpression.getFunctionName())) {
+        return false;
+      }
+      List<Expression> inputExpressions = functionExpression.getExpressions();
+      for (Expression inputExpression : inputExpressions) {
+        if (!checkIsScalarExpression(inputExpression, analysis)) {
+          return false;
+        }
+      }
+      return true;
+    } else if (expression instanceof LeafOperand) {
+      return true;
+    } else {
+      throw new IllegalArgumentException(
+          "unsupported expression type: " + expression.getExpressionType());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
index 44a131ec02..d1f15a9534 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/memory/StatementMemorySourceVisitor.java
@@ -92,8 +92,11 @@ public class StatementMemorySourceVisitor
             .plan(context.getAnalysis());
     DistributionPlanner planner = new DistributionPlanner(context.getAnalysis(), logicalPlan);
     PlanNode rootWithExchange = planner.addExchangeNode(planner.rewriteSource());
+    PlanNode optimizedRootWithExchange = planner.optimize(rootWithExchange);
+
     List<String> lines =
-        rootWithExchange.accept(new PlanGraphPrinter(), new PlanGraphPrinter.GraphContext());
+        optimizedRootWithExchange.accept(
+            new PlanGraphPrinter(), new PlanGraphPrinter.GraphContext());
 
     TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.TEXT));
     lines.forEach(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
index 170e21db1a..7ac2d0e6bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/ExpressionFactory.java
@@ -47,6 +47,10 @@ public class ExpressionFactory {
     return new TimeSeriesOperand(path);
   }
 
+  public static TimeSeriesOperand timeSeries(PartialPath path) {
+    return new TimeSeriesOperand(path);
+  }
+
   public static ConstantOperand constant(TSDataType dataType, String valueString) {
     return new ConstantOperand(dataType, valueString);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java
new file mode 100644
index 0000000000..a2b55ca175
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDown.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.statement.StatementType;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+
+/**
+ * <b>Optimization phase:</b> Distributed plan planning
+ *
+ * <p><b>Rule:</b> The LIMIT OFFSET condition can be pushed down to the SeriesScanNode, when the
+ * following conditions are met:
+ * <li>Time series query (not aggregation query).
+ * <li>The query expressions are all scalar expression.
+ * <li>Functions that need to be calculated based on before or after values are not used, such as
+ *     trend functions, FILL(previous), FILL(linear).
+ * <li>Only one scan node is included in the distributed plan. That is, only one single series or a
+ *     group of series under an aligned device is queried, and all queried data is in one region.
+ */
+public class LimitOffsetPushDown implements PlanOptimizer {
+
+  @Override
+  public PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context) {
+    if (analysis.getStatement().getType() != StatementType.QUERY) {
+      return plan;
+    }
+    QueryStatement queryStatement = (QueryStatement) analysis.getStatement();
+    if (queryStatement.isAggregationQuery()
+        || (!queryStatement.hasLimit() && !queryStatement.hasOffset())) {
+      return plan;
+    }
+    return plan.accept(new Rewriter(), new RewriterContext(analysis));
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> {
+
+    @Override
+    public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+      for (PlanNode child : node.getChildren()) {
+        context.setParent(node);
+        child.accept(this, context);
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitLimit(LimitNode node, RewriterContext context) {
+      PlanNode parent = context.getParent();
+
+      context.setParent(node);
+      context.setLimit(node.getLimit());
+      node.getChild().accept(this, context);
+
+      if (context.isEnablePushDown()) {
+        return concatParentWithChild(parent, node.getChild());
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitOffset(OffsetNode node, RewriterContext context) {
+      PlanNode parent = context.getParent();
+
+      context.setParent(node);
+      context.setOffset(node.getOffset());
+      node.getChild().accept(this, context);
+
+      if (context.isEnablePushDown()) {
+        return concatParentWithChild(parent, node.getChild());
+      }
+      return node;
+    }
+
+    private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) {
+      if (parent != null) {
+        ((SingleChildProcessNode) parent).setChild(child);
+        return parent;
+      } else {
+        return child;
+      }
+    }
+
+    @Override
+    public PlanNode visitFill(FillNode node, RewriterContext context) {
+      FillPolicy fillPolicy = node.getFillDescriptor().getFillPolicy();
+      if (fillPolicy == FillPolicy.VALUE) {
+        node.getChild().accept(this, context);
+      } else {
+        context.setEnablePushDown(false);
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitFilter(FilterNode node, RewriterContext context) {
+      // Value filtering push-down occurs during the logical planning phase. If there is still a
+      // FilterNode here, it means that there are query filter conditions that cannot be pushed
+      // down.
+      context.setEnablePushDown(false);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitTransform(TransformNode node, RewriterContext context) {
+      Expression[] outputExpressions = node.getOutputExpressions();
+      boolean enablePushDown = true;
+      for (Expression expression : outputExpressions) {
+        if (!ExpressionAnalyzer.checkIsScalarExpression(expression, context.getAnalysis())) {
+          enablePushDown = false;
+          break;
+        }
+      }
+
+      if (enablePushDown) {
+        node.getChild().accept(this, context);
+      } else {
+        context.setEnablePushDown(false);
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitMultiChildProcess(MultiChildProcessNode node, RewriterContext context) {
+      context.setEnablePushDown(false);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitDeviceView(DeviceViewNode node, RewriterContext context) {
+      if (node.getChildren().size() == 1) {
+        node.getChildren().get(0).accept(this, context);
+        return node;
+      } else {
+        return visitMultiChildProcess(node, context);
+      }
+    }
+
+    @Override
+    public PlanNode visitSeriesScan(SeriesScanNode node, RewriterContext context) {
+      if (context.isEnablePushDown()) {
+        node.setLimit(context.getLimit());
+        node.setOffset(context.getOffset());
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, RewriterContext context) {
+      if (context.isEnablePushDown()) {
+        node.setLimit(context.getLimit());
+        node.setOffset(context.getOffset());
+      }
+      return node;
+    }
+  }
+
+  private static class RewriterContext {
+    private long limit;
+    private long offset;
+
+    private boolean enablePushDown = true;
+
+    private PlanNode parent;
+
+    private final Analysis analysis;
+
+    public RewriterContext(Analysis analysis) {
+      this.analysis = analysis;
+    }
+
+    public long getLimit() {
+      return limit;
+    }
+
+    public void setLimit(long limit) {
+      this.limit = limit;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    public void setOffset(long offset) {
+      this.offset = offset;
+    }
+
+    public boolean isEnablePushDown() {
+      return enablePushDown;
+    }
+
+    public void setEnablePushDown(boolean enablePushDown) {
+      this.enablePushDown = enablePushDown;
+    }
+
+    public PlanNode getParent() {
+      return parent;
+    }
+
+    public void setParent(PlanNode parent) {
+      this.parent = parent;
+    }
+
+    public Analysis getAnalysis() {
+      return analysis;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
index 20d6d37a1d..a2b1f860ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/optimization/PlanOptimizer.java
@@ -19,8 +19,9 @@
 package org.apache.iotdb.db.mpp.plan.optimization;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 
 public interface PlanOptimizer {
-  PlanNode optimize(PlanNode plan, MPPQueryContext context);
+  PlanNode optimize(PlanNode plan, Analysis analysis, MPPQueryContext context);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 2a80908424..08791b8217 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -1336,12 +1336,12 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
   }
 
   // parse LIMIT & OFFSET
-  private int parseLimitClause(IoTDBSqlParser.LimitClauseContext ctx) {
-    int limit;
+  private long parseLimitClause(IoTDBSqlParser.LimitClauseContext ctx) {
+    long limit;
     try {
-      limit = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+      limit = Long.parseLong(ctx.INTEGER_LITERAL().getText());
     } catch (NumberFormatException e) {
-      throw new SemanticException("Out of range. LIMIT <N>: N should be Int32.");
+      throw new SemanticException("Out of range. LIMIT <N>: N should be Int64.");
     }
     if (limit <= 0) {
       throw new SemanticException("LIMIT <N>: N should be greater than 0.");
@@ -1349,13 +1349,13 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
     return limit;
   }
 
-  private int parseOffsetClause(IoTDBSqlParser.OffsetClauseContext ctx) {
-    int offset;
+  private long parseOffsetClause(IoTDBSqlParser.OffsetClauseContext ctx) {
+    long offset;
     try {
-      offset = Integer.parseInt(ctx.INTEGER_LITERAL().getText());
+      offset = Long.parseLong(ctx.INTEGER_LITERAL().getText());
     } catch (NumberFormatException e) {
       throw new SemanticException(
-          "Out of range. OFFSET <OFFSETValue>: OFFSETValue should be Int32.");
+          "Out of range. OFFSET <OFFSETValue>: OFFSETValue should be Int64.");
     }
     if (offset < 0) {
       throw new SemanticException("OFFSET <OFFSETValue>: OFFSETValue should >= 0.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index d3893276a8..f2ba699208 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -880,7 +880,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planLimit(int rowLimit) {
+  public LogicalPlanBuilder planLimit(long rowLimit) {
     if (rowLimit == 0) {
       return this;
     }
@@ -889,7 +889,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planOffset(int rowOffset) {
+  public LogicalPlanBuilder planOffset(long rowOffset) {
     if (rowOffset == 0) {
       return this;
     }
@@ -963,8 +963,8 @@ public class LogicalPlanBuilder {
       PartialPath pathPattern,
       String key,
       String value,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       boolean orderByHeat,
       boolean contains,
       boolean prefixPath,
@@ -985,7 +985,7 @@ public class LogicalPlanBuilder {
   }
 
   public LogicalPlanBuilder planDeviceSchemaSource(
-      PartialPath pathPattern, int limit, int offset, boolean prefixPath, boolean hasSgCol) {
+      PartialPath pathPattern, long limit, long offset, boolean prefixPath, boolean hasSgCol) {
     this.root =
         new DevicesSchemaScanNode(
             context.getQueryId().genPlanNodeId(), pathPattern, limit, offset, prefixPath, hasSgCol);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 7696aab20a..a5453c348b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -466,8 +466,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
             && analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1
             && !showTimeSeriesStatement.isOrderByHeat();
 
-    int limit = showTimeSeriesStatement.getLimit();
-    int offset = showTimeSeriesStatement.getOffset();
+    long limit = showTimeSeriesStatement.getLimit();
+    long offset = showTimeSeriesStatement.getOffset();
     if (showTimeSeriesStatement.isOrderByHeat()) {
       limit = 0;
       offset = 0;
@@ -523,8 +523,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
         analysis.getSchemaPartitionInfo() != null
             && analysis.getSchemaPartitionInfo().getDistributionInfo().size() == 1;
 
-    int limit = showDevicesStatement.getLimit();
-    int offset = showDevicesStatement.getOffset();
+    long limit = showDevicesStatement.getLimit();
+    long offset = showDevicesStatement.getOffset();
     if (!canPushDownOffsetLimit) {
       limit = showDevicesStatement.getLimit() + showDevicesStatement.getOffset();
       offset = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index b3f65eec7c..f55d695b2d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -50,7 +50,7 @@ public class LogicalPlanner {
           .recordPlanCost(LOGICAL_PLANNER, System.nanoTime() - startTime);
 
       for (PlanOptimizer optimizer : optimizers) {
-        rootNode = optimizer.optimize(rootNode, context);
+        rootNode = optimizer.optimize(rootNode, analysis, context);
       }
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index f8ef1e525b..f56794b004 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -290,6 +290,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
     seriesScanOptionsBuilder.withAllSensors(
         context.getAllSensors(seriesPath.getDevice(), seriesPath.getMeasurement()));
+    seriesScanOptionsBuilder.withLimit(node.getLimit());
+    seriesScanOptionsBuilder.withOffset(node.getOffset());
 
     SeriesScanOperator seriesScanOperator =
         new SeriesScanOperator(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 422759a807..73fe5c5259 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.exchange.sink.DownStreamChannelLocation;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.optimization.LimitOffsetPushDown;
+import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -40,7 +42,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.ShuffleSinkNode;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderByComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 
 import org.apache.commons.lang3.Validate;
 
@@ -57,10 +58,15 @@ public class DistributionPlanner {
   private MPPQueryContext context;
   private LogicalQueryPlan logicalPlan;
 
+  private final List<PlanOptimizer> optimizers;
+
+  private int planFragmentIndex = 0;
+
   public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
     this.analysis = analysis;
     this.logicalPlan = logicalPlan;
     this.context = logicalPlan.getContext();
+    this.optimizers = Collections.singletonList(new LimitOffsetPushDown());
   }
 
   public PlanNode rewriteSource() {
@@ -152,6 +158,15 @@ public class DistributionPlanner {
             || orderByComponent.getSortItemList().get(0).getSortKey().equals(SortKey.DEVICE));
   }
 
+  public PlanNode optimize(PlanNode rootWithExchange) {
+    if (analysis.getStatement() != null && analysis.getStatement().isQuery()) {
+      for (PlanOptimizer optimizer : optimizers) {
+        rootWithExchange = optimizer.optimize(rootWithExchange, analysis, context);
+      }
+    }
+    return rootWithExchange;
+  }
+
   public SubPlan splitFragment(PlanNode root) {
     FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
     return fragmentBuilder.splitToSubPlan(root);
@@ -160,13 +175,13 @@ public class DistributionPlanner {
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    if (analysis.getStatement() instanceof QueryStatement
-        || analysis.getStatement() instanceof ShowQueriesStatement) {
+    if (analysis.getStatement() != null && analysis.getStatement().isQuery()) {
       analysis
           .getRespDatasetHeader()
           .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
     }
-    SubPlan subPlan = splitFragment(rootWithExchange);
+    PlanNode optimizedRootWithExchange = optimize(rootWithExchange);
+    SubPlan subPlan = splitFragment(optimizedRootWithExchange);
     // Mark the root Fragment of root SubPlan as `root`
     subPlan.getPlanFragment().setRoot(true);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index c0a839cc6e..a9726d0c2f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -96,6 +96,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
     boxValue.add(String.format("SeriesScan-%s", node.getPlanNodeId().getId()));
     boxValue.add(String.format("Series: %s", node.getSeriesPath()));
     boxValue.add(String.format("TimeFilter: %s", node.getTimeFilter()));
+
+    long limit = node.getLimit(), offset = node.getOffset();
+    if (limit > 0) {
+      boxValue.add(String.format("Limit: %s", limit));
+    }
+    if (offset > 0) {
+      boxValue.add(String.format("Offset: %s", offset));
+    }
     boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
@@ -109,6 +117,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
             "Series: %s%s",
             node.getAlignedPath().getDevice(), node.getAlignedPath().getMeasurementList()));
     boxValue.add(String.format("TimeFilter: %s", node.getTimeFilter()));
+
+    long limit = node.getLimit(), offset = node.getOffset();
+    if (limit > 0) {
+      boxValue.add(String.format("Limit: %s", limit));
+    }
+    if (offset > 0) {
+      boxValue.add(String.format("Offset: %s", offset));
+    }
     boxValue.add(printRegion(node.getRegionReplicaSet()));
     return render(node, boxValue, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 9356ddc742..cf3fbe02af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -60,8 +60,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.HorizontallyConcat
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
@@ -79,6 +81,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.ShowQueriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
@@ -94,76 +97,88 @@ public abstract class PlanVisitor<R, C> {
 
   public abstract R visitPlan(PlanNode node, C context);
 
-  public R visitSeriesScan(SeriesScanNode node, C context) {
+  public R visitSourceNode(SourceNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) {
+  public R visitSingleChildProcess(SingleChildProcessNode node, C context) {
     return visitPlan(node, context);
   }
 
-  public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
+  public R visitMultiChildProcess(MultiChildProcessNode node, C context) {
     return visitPlan(node, context);
   }
 
+  public R visitSeriesScan(SeriesScanNode node, C context) {
+    return visitSourceNode(node, context);
+  }
+
+  public R visitSeriesAggregationScan(SeriesAggregationScanNode node, C context) {
+    return visitSourceNode(node, context);
+  }
+
+  public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
+    return visitSourceNode(node, context);
+  }
+
   public R visitAlignedSeriesAggregationScan(AlignedSeriesAggregationScanNode node, C context) {
-    return visitPlan(node, context);
+    return visitSourceNode(node, context);
   }
 
   public R visitDeviceView(DeviceViewNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitDeviceMerge(DeviceMergeNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitFill(FillNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitFilter(FilterNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitGroupByLevel(GroupByLevelNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitGroupByTag(GroupByTagNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitSlidingWindowAggregation(SlidingWindowAggregationNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitLimit(LimitNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitOffset(OffsetNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitAggregation(AggregationNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitSort(SortNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitProject(ProjectNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitTimeJoin(TimeJoinNode node, C context) {
-    return visitPlan(node, context);
+    return visitMultiChildProcess(node, context);
   }
 
   public R visitExchange(ExchangeNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitSchemaQueryMerge(SchemaQueryMergeNode node, C context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
index 2dae4bd591..73307996cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
@@ -41,8 +41,8 @@ public class DevicesSchemaScanNode extends SchemaQueryScanNode {
   public DevicesSchemaScanNode(
       PlanNodeId id,
       PartialPath path,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       boolean isPrefixPath,
       boolean hasSgCol) {
     super(id, path, limit, offset, isPrefixPath);
@@ -98,8 +98,8 @@ public class DevicesSchemaScanNode extends SchemaQueryScanNode {
     } catch (IllegalPathException e) {
       throw new IllegalArgumentException("Cannot deserialize DevicesSchemaScanNode", e);
     }
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+    long limit = ReadWriteIOUtils.readLong(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
     boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
     boolean hasSgCol = ReadWriteIOUtils.readBool(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
index 7d71b7f64f..69af27aca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaQueryScanNode.java
@@ -31,8 +31,8 @@ import java.util.List;
 import java.util.Objects;
 
 public abstract class SchemaQueryScanNode extends SourceNode {
-  protected int limit;
-  protected int offset;
+  protected long limit;
+  protected long offset;
   protected PartialPath path;
   private boolean hasLimit;
   protected boolean isPrefixPath;
@@ -44,7 +44,7 @@ public abstract class SchemaQueryScanNode extends SourceNode {
   }
 
   protected SchemaQueryScanNode(
-      PlanNodeId id, PartialPath partialPath, int limit, int offset, boolean isPrefixPath) {
+      PlanNodeId id, PartialPath partialPath, long limit, long offset, boolean isPrefixPath) {
     super(id);
     this.path = partialPath;
     setLimit(limit);
@@ -89,11 +89,11 @@ public abstract class SchemaQueryScanNode extends SourceNode {
     return isPrefixPath;
   }
 
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
-  public void setLimit(int limit) {
+  public void setLimit(long limit) {
     this.limit = limit;
     if (limit == 0) {
       hasLimit = false;
@@ -112,7 +112,7 @@ public abstract class SchemaQueryScanNode extends SourceNode {
     this.schemaRegionReplicaSet = schemaRegionReplicaSet;
   }
 
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
index 683abeabb8..29d080644c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
@@ -72,8 +72,8 @@ public class TimeSeriesSchemaScanNode extends SchemaQueryScanNode {
       PartialPath partialPath,
       String key,
       String value,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       boolean orderByHeat,
       boolean isContains,
       boolean isPrefixPath,
@@ -132,8 +132,8 @@ public class TimeSeriesSchemaScanNode extends SchemaQueryScanNode {
     }
     String key = ReadWriteIOUtils.readString(byteBuffer);
     String value = ReadWriteIOUtils.readString(byteBuffer);
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+    long limit = ReadWriteIOUtils.readLong(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
     boolean oderByHeat = ReadWriteIOUtils.readBool(byteBuffer);
     boolean isContains = ReadWriteIOUtils.readBool(byteBuffer);
     boolean isPrefixPath = ReadWriteIOUtils.readBool(byteBuffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
index ae03e13f70..df9387397c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/LimitNode.java
@@ -33,19 +33,19 @@ import java.util.Objects;
 /** LimitNode is used to select top n result. It uses the default order of upstream nodes */
 public class LimitNode extends SingleChildProcessNode {
 
-  private final int limit;
+  private final long limit;
 
-  public LimitNode(PlanNodeId id, int limit) {
+  public LimitNode(PlanNodeId id, long limit) {
     super(id);
     this.limit = limit;
   }
 
-  public LimitNode(PlanNodeId id, PlanNode child, int limit) {
+  public LimitNode(PlanNodeId id, PlanNode child, long limit) {
     super(id, child);
     this.limit = limit;
   }
 
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
@@ -77,7 +77,7 @@ public class LimitNode extends SingleChildProcessNode {
   }
 
   public static LimitNode deserialize(ByteBuffer byteBuffer) {
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
+    long limit = ReadWriteIOUtils.readLong(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new LimitNode(planNodeId, limit);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
index 64b912303b..59151bbb51 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/OffsetNode.java
@@ -36,14 +36,14 @@ import java.util.Objects;
  */
 public class OffsetNode extends SingleChildProcessNode {
 
-  private final int offset;
+  private final long offset;
 
-  public OffsetNode(PlanNodeId id, int offset) {
+  public OffsetNode(PlanNodeId id, long offset) {
     super(id);
     this.offset = offset;
   }
 
-  public OffsetNode(PlanNodeId id, PlanNode child, int offset) {
+  public OffsetNode(PlanNodeId id, PlanNode child, long offset) {
     super(id, child);
     this.offset = offset;
   }
@@ -76,12 +76,12 @@ public class OffsetNode extends SingleChildProcessNode {
   }
 
   public static OffsetNode deserialize(ByteBuffer byteBuffer) {
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new OffsetNode(planNodeId, offset);
   }
 
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
index 3adf0b0618..09387d0e5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/AlignedSeriesScanNode.java
@@ -62,10 +62,10 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
   @Nullable private Filter valueFilter;
 
   // Limit for result set. The default value is -1, which means no limit
-  private int limit;
+  private long limit;
 
   // offset for result set. The default value is 0
-  private int offset;
+  private long offset;
 
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
@@ -86,8 +86,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
       Ordering scanOrder,
       @Nullable Filter timeFilter,
       @Nullable Filter valueFilter,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       TRegionReplicaSet dataRegionReplicaSet) {
     this(id, alignedPath, scanOrder);
     this.timeFilter = timeFilter;
@@ -119,14 +119,22 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
     return valueFilter;
   }
 
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
+  public void setLimit(long limit) {
+    this.limit = limit;
+  }
+
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
   @Override
   public void open() throws Exception {}
 
@@ -241,8 +249,8 @@ public class AlignedSeriesScanNode extends SeriesSourceNode {
     if (isNull == 1) {
       valueFilter = FilterFactory.deserialize(byteBuffer);
     }
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+    long limit = ReadWriteIOUtils.readLong(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AlignedSeriesScanNode(
         planNodeId, alignedPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
index 637aad52ec..6458d5af0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesScanNode.java
@@ -66,10 +66,10 @@ public class SeriesScanNode extends SeriesSourceNode {
   @Nullable private Filter valueFilter;
 
   // Limit for result set. The default value is -1, which means no limit
-  private int limit;
+  private long limit;
 
   // offset for result set. The default value is 0
-  private int offset;
+  private long offset;
 
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
@@ -90,8 +90,8 @@ public class SeriesScanNode extends SeriesSourceNode {
       Ordering scanOrder,
       @Nullable Filter timeFilter,
       @Nullable Filter valueFilter,
-      int limit,
-      int offset,
+      long limit,
+      long offset,
       TRegionReplicaSet dataRegionReplicaSet) {
     this(id, seriesPath, scanOrder);
     this.timeFilter = timeFilter;
@@ -117,19 +117,19 @@ public class SeriesScanNode extends SeriesSourceNode {
     this.regionReplicaSet = dataRegion;
   }
 
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
-  public void setLimit(int limit) {
+  public void setLimit(long limit) {
     this.limit = limit;
   }
 
-  public void setOffset(int offset) {
+  public void setOffset(long offset) {
     this.offset = offset;
   }
 
@@ -252,8 +252,8 @@ public class SeriesScanNode extends SeriesSourceNode {
     if (isNull == 1) {
       valueFilter = FilterFactory.deserialize(byteBuffer);
     }
-    int limit = ReadWriteIOUtils.readInt(byteBuffer);
-    int offset = ReadWriteIOUtils.readInt(byteBuffer);
+    long limit = ReadWriteIOUtils.readLong(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new SeriesScanNode(
         planNodeId, partialPath, scanOrder, timeFilter, valueFilter, limit, offset, null);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
index a3730c89a0..3962bae22b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/Statement.java
@@ -55,7 +55,7 @@ public abstract class Statement extends StatementNode {
   }
 
   public boolean isQuery() {
-    return statementType == StatementType.QUERY;
+    return false;
   }
 
   public boolean isAuthenticationRequired() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index ef9550a7ec..480ab2a026 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -78,14 +78,14 @@ public class QueryStatement extends Statement {
   private HavingCondition havingCondition;
 
   // row limit for result set. The default value is 0, which means no limit
-  private int rowLimit = 0;
+  private long rowLimit = 0;
   // row offset for result set. The default value is 0
-  private int rowOffset = 0;
+  private long rowOffset = 0;
 
   // series limit and offset for result set. The default value is 0, which means no limit
-  private int seriesLimit = 0;
+  private long seriesLimit = 0;
   // series offset for result set. The default value is 0
-  private int seriesOffset = 0;
+  private long seriesOffset = 0;
 
   private FillComponent fillComponent;
 
@@ -116,6 +116,11 @@ public class QueryStatement extends Statement {
     this.statementType = StatementType.QUERY;
   }
 
+  @Override
+  public boolean isQuery() {
+    return true;
+  }
+
   @Override
   public List<PartialPath> getPaths() {
     Set<PartialPath> authPaths = new HashSet<>();
@@ -168,35 +173,35 @@ public class QueryStatement extends Statement {
     this.havingCondition = havingCondition;
   }
 
-  public int getRowLimit() {
+  public long getRowLimit() {
     return rowLimit;
   }
 
-  public void setRowLimit(int rowLimit) {
+  public void setRowLimit(long rowLimit) {
     this.rowLimit = rowLimit;
   }
 
-  public int getRowOffset() {
+  public long getRowOffset() {
     return rowOffset;
   }
 
-  public void setRowOffset(int rowOffset) {
+  public void setRowOffset(long rowOffset) {
     this.rowOffset = rowOffset;
   }
 
-  public int getSeriesLimit() {
+  public long getSeriesLimit() {
     return seriesLimit;
   }
 
-  public void setSeriesLimit(int seriesLimit) {
+  public void setSeriesLimit(long seriesLimit) {
     this.seriesLimit = seriesLimit;
   }
 
-  public int getSeriesOffset() {
+  public long getSeriesOffset() {
     return seriesOffset;
   }
 
-  public void setSeriesOffset(int seriesOffset) {
+  public void setSeriesOffset(long seriesOffset) {
     this.seriesOffset = seriesOffset;
   }
 
@@ -373,6 +378,14 @@ public class QueryStatement extends Statement {
     isCqQueryBody = cqQueryBody;
   }
 
+  public boolean hasLimit() {
+    return rowLimit > 0;
+  }
+
+  public boolean hasOffset() {
+    return rowOffset > 0;
+  }
+
   public void semanticCheck() {
     if (isAggregationQuery()) {
       if (disableAlign()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
index 014f294555..87c6576a6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowStatement.java
@@ -28,8 +28,8 @@ import java.util.List;
 
 public class ShowStatement extends Statement {
 
-  int limit = 0;
-  int offset = 0;
+  long limit = 0;
+  long offset = 0;
 
   protected boolean isPrefixPath;
 
@@ -43,19 +43,19 @@ public class ShowStatement extends Statement {
     return Collections.emptyList();
   }
 
-  public int getLimit() {
+  public long getLimit() {
     return limit;
   }
 
-  public void setLimit(int limit) {
+  public void setLimit(long limit) {
     this.limit = limit;
   }
 
-  public int getOffset() {
+  public long getOffset() {
     return offset;
   }
 
-  public void setOffset(int offset) {
+  public void setOffset(long offset) {
     this.offset = offset;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
index 1b3439519d..c6f266d463 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/sys/ShowQueriesStatement.java
@@ -37,13 +37,18 @@ public class ShowQueriesStatement extends ShowStatement {
 
   private OrderByComponent orderByComponent;
 
-  private int rowLimit;
-  private int rowOffset;
+  private long rowLimit;
+  private long rowOffset;
 
   private ZoneId zoneId;
 
   public ShowQueriesStatement() {}
 
+  @Override
+  public boolean isQuery() {
+    return true;
+  }
+
   @Override
   public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
     return visitor.visitShowQueries(this, context);
@@ -73,19 +78,19 @@ public class ShowQueriesStatement extends ShowStatement {
     return orderByComponent.getSortItemList();
   }
 
-  public void setRowLimit(int rowLimit) {
+  public void setRowLimit(long rowLimit) {
     this.rowLimit = rowLimit;
   }
 
-  public int getRowLimit() {
+  public long getRowLimit() {
     return rowLimit;
   }
 
-  public void setRowOffset(int rowOffset) {
+  public void setRowOffset(long rowOffset) {
     this.rowOffset = rowOffset;
   }
 
-  public int getRowOffset() {
+  public long getRowOffset() {
     return rowOffset;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java
new file mode 100644
index 0000000000..a473771a8f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/LimitOffsetPushDownTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.mpp.plan.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.add;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.function;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.gt;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.intValue;
+import static org.apache.iotdb.db.mpp.plan.expression.ExpressionFactory.timeSeries;
+
+public class LimitOffsetPushDownTest {
+
+  private static final Map<String, PartialPath> schemaMap = new HashMap<>();
+
+  static {
+    try {
+      schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32));
+      schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE));
+      schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32));
+      schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE));
+
+      MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32);
+      MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE);
+      AlignedPath alignedPath =
+          new AlignedPath(
+              "root.sg.d2.a",
+              Arrays.asList("s1", "s2"),
+              Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema()));
+      aS1.setUnderAlignedEntity(true);
+      aS2.setUnderAlignedEntity(true);
+      schemaMap.put("root.sg.d2.a.s1", aS1);
+      schemaMap.put("root.sg.d2.a.s2", aS2);
+      schemaMap.put("root.sg.d2.a", alignedPath);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testNonAlignedPushDown() {
+    checkPushDown(
+        "select s1 from root.sg.d1 limit 100;",
+        new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1")).limit("1", 100).getRoot(),
+        new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 100, 0).getRoot());
+    checkPushDown(
+        "select s1 from root.sg.d1 offset 100;",
+        new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1")).offset("1", 100).getRoot(),
+        new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 0, 100).getRoot());
+    checkPushDown(
+        "select s1 from root.sg.d1 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .offset("1", 100)
+            .limit("2", 100)
+            .getRoot(),
+        new TestPlanBuilder().scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100).getRoot());
+  }
+
+  @Test
+  public void testAlignedPushDown() {
+    checkPushDown(
+        "select s1, s2 from root.sg.d2.a limit 100;",
+        new TestPlanBuilder()
+            .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+            .limit("1", 100)
+            .getRoot(),
+        new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 100, 0).getRoot());
+    checkPushDown(
+        "select s1, s2 from root.sg.d2.a offset 100;",
+        new TestPlanBuilder()
+            .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+            .offset("1", 100)
+            .getRoot(),
+        new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 0, 100).getRoot());
+    checkPushDown(
+        "select s1, s2 from root.sg.d2.a limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scanAligned("0", schemaMap.get("root.sg.d2.a"))
+            .offset("1", 100)
+            .limit("2", 100)
+            .getRoot(),
+        new TestPlanBuilder().scanAligned("0", schemaMap.get("root.sg.d2.a"), 100, 100).getRoot());
+  }
+
+  @Test
+  public void testPushDownWithTransform() {
+    List<Expression> expressions =
+        Arrays.asList(
+            add(
+                function("sin", add(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("1"))),
+                intValue("1")),
+            gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")));
+    checkPushDown(
+        "select sin(s1 + 1) + 1, s1 > 10 from root.sg.d1 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .transform("1", expressions)
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot(),
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+            .transform("1", expressions)
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownWithFill() {
+    checkPushDown(
+        "select s1 from root.sg.d1 fill(100) limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .fill("1", "100")
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot(),
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+            .fill("1", "100")
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownAlignByDevice() {
+    checkPushDown(
+        "select s1 from root.sg.d1 limit 100 offset 100 align by device;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .singleDeviceView("1", "root.sg.d1", "s1")
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot(),
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+            .singleDeviceView("1", "root.sg.d1", "s1")
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownWithInto() {
+    checkPushDown(
+        "select s1 into root.sg.d2(s1) from root.sg.d1 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .offset("1", 100)
+            .limit("2", 100)
+            .into("3", schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d2.s1"))
+            .getRoot(),
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"), 100, 100)
+            .into("3", schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d2.s1"))
+            .getRoot());
+  }
+
+  @Test
+  public void testCannotPushDown() {
+    checkCannotPushDown(
+        "select s1, s2 from root.sg.d1 limit 100;",
+        new TestPlanBuilder()
+            .timeJoin(Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2")))
+            .limit("3", 100)
+            .getRoot());
+
+    checkCannotPushDown(
+        "select diff(s1 + 1) + 1 from root.sg.d1 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .transform(
+                "1",
+                Collections.singletonList(
+                    add(
+                        function(
+                            "diff", add(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("1"))),
+                        intValue("1"))))
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot());
+
+    LinkedHashMap<String, String> functionAttributes = new LinkedHashMap<>();
+    functionAttributes.put("windowSize", "10");
+    checkCannotPushDown(
+        "select m4(s1,'windowSize'='10') from root.sg.d1 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .transform(
+                "1",
+                Collections.singletonList(
+                    function("m4", functionAttributes, timeSeries(schemaMap.get("root.sg.d1.s1")))))
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot());
+
+    checkCannotPushDown(
+        "select s1 from root.sg.d1 fill(linear) limit 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .fill("1", FillPolicy.LINEAR)
+            .limit("2", 100)
+            .getRoot());
+    checkCannotPushDown(
+        "select s1 from root.sg.d1 fill(previous) limit 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .fill("1", FillPolicy.PREVIOUS)
+            .limit("2", 100)
+            .getRoot());
+
+    checkCannotPushDown(
+        "select s1 from root.sg.d1 where s1 > 10 limit 100 offset 100;",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .filter(
+                "1",
+                Collections.singletonList(timeSeries(schemaMap.get("root.sg.d1.s1"))),
+                gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")))
+            .offset("2", 100)
+            .limit("3", 100)
+            .getRoot());
+  }
+
+  private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
+    Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    PlanNode actualPlan = planner.plan(analysis).getRootNode();
+    Assert.assertEquals(rawPlan, actualPlan);
+
+    PlanNode actualOptPlan = new LimitOffsetPushDown().optimize(actualPlan, analysis, context);
+    Assert.assertEquals(optPlan, actualOptPlan);
+  }
+
+  private void checkCannotPushDown(String sql, PlanNode rawPlan) {
+    Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    PlanNode actualPlan = planner.plan(analysis).getRootNode();
+
+    Assert.assertEquals(rawPlan, actualPlan);
+    Assert.assertEquals(
+        actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis, context));
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
new file mode 100644
index 0000000000..c563c16e89
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/optimization/TestPlanBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.optimization;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.mpp.plan.statement.literal.LongLiteral;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.DEVICE;
+
+public class TestPlanBuilder {
+
+  private PlanNode root;
+
+  public TestPlanBuilder() {}
+
+  public PlanNode getRoot() {
+    return root;
+  }
+
+  public TestPlanBuilder scan(String id, PartialPath path) {
+    this.root = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path);
+    return this;
+  }
+
+  public TestPlanBuilder scanAligned(String id, PartialPath path) {
+    this.root = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path);
+    return this;
+  }
+
+  public TestPlanBuilder scan(String id, PartialPath path, int limit, int offset) {
+    SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path);
+    node.setLimit(limit);
+    node.setOffset(offset);
+    this.root = node;
+    return this;
+  }
+
+  public TestPlanBuilder scanAligned(String id, PartialPath path, int limit, int offset) {
+    AlignedSeriesScanNode node = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path);
+    node.setLimit(limit);
+    node.setOffset(offset);
+    this.root = node;
+    return this;
+  }
+
+  public TestPlanBuilder timeJoin(List<PartialPath> paths) {
+    int planId = 0;
+
+    List<PlanNode> seriesSourceNodes = new ArrayList<>();
+    for (PartialPath path : paths) {
+      seriesSourceNodes.add(
+          new SeriesScanNode(new PlanNodeId(String.valueOf(planId)), (MeasurementPath) path));
+      planId++;
+    }
+    this.root =
+        new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, seriesSourceNodes);
+    return this;
+  }
+
+  public TestPlanBuilder transform(String id, List<Expression> expressions) {
+    this.root =
+        new TransformNode(
+            new PlanNodeId(id),
+            getRoot(),
+            expressions.toArray(new Expression[0]),
+            false,
+            ZonedDateTime.now().getOffset(),
+            Ordering.ASC);
+    return this;
+  }
+
+  public TestPlanBuilder limit(String id, long limit) {
+    this.root = new LimitNode(new PlanNodeId(id), getRoot(), limit);
+    return this;
+  }
+
+  public TestPlanBuilder offset(String id, long offset) {
+    this.root = new OffsetNode(new PlanNodeId(id), getRoot(), offset);
+    return this;
+  }
+
+  public TestPlanBuilder fill(String id, FillPolicy fillPolicy) {
+    this.root =
+        new FillNode(new PlanNodeId(id), getRoot(), new FillDescriptor(fillPolicy), Ordering.ASC);
+    return this;
+  }
+
+  public TestPlanBuilder fill(String id, String intValue) {
+    this.root =
+        new FillNode(
+            new PlanNodeId(id),
+            getRoot(),
+            new FillDescriptor(FillPolicy.VALUE, new LongLiteral(intValue)),
+            Ordering.ASC);
+    return this;
+  }
+
+  public TestPlanBuilder singleDeviceView(String id, String device, String measurement) {
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put(device, Collections.singletonList(1));
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(
+            new PlanNodeId(id),
+            new OrderByParameter(
+                Arrays.asList(
+                    new SortItem(SortKey.DEVICE, Ordering.ASC),
+                    new SortItem(SortKey.TIME, Ordering.ASC))),
+            Arrays.asList(DEVICE, measurement),
+            deviceToMeasurementIndexesMap);
+    deviceViewNode.addChildDeviceNode(device, getRoot());
+    this.root = deviceViewNode;
+    return this;
+  }
+
+  public TestPlanBuilder filter(String id, List<Expression> expressions, Expression predicate) {
+    this.root =
+        new FilterNode(
+            new PlanNodeId(id),
+            getRoot(),
+            expressions.toArray(new Expression[0]),
+            predicate,
+            false,
+            ZonedDateTime.now().getOffset(),
+            Ordering.ASC);
+    return this;
+  }
+
+  public TestPlanBuilder into(String id, PartialPath sourcePath, PartialPath intoPath) {
+    IntoPathDescriptor intoPathDescriptor = new IntoPathDescriptor();
+    intoPathDescriptor.specifyTargetPath(sourcePath.toString(), intoPath);
+    intoPathDescriptor.specifyDeviceAlignment(intoPath.getDevice(), false);
+    intoPathDescriptor.recordSourceColumnDataType(
+        sourcePath.toString(), sourcePath.getSeriesType());
+    this.root = new IntoNode(new PlanNodeId(id), getRoot(), intoPathDescriptor);
+    return this;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
index 022749581b..56ef93e10b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/LogicalPlannerTest.java
@@ -82,7 +82,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 @Ignore
-@Deprecated
 public class LogicalPlannerTest {
 
   @Test