You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/06/10 04:16:34 UTC

[shardingsphere] branch master updated: Add rewrite logic for fetch statement to reduce memory occupy (#18275)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c4231b7bc4 Add rewrite logic for fetch statement to reduce memory occupy (#18275)
2c4231b7bc4 is described below

commit 2c4231b7bc4b51d8840bfac682f4c82d4a19a871
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Fri Jun 10 12:16:25 2022 +0800

    Add rewrite logic for fetch statement to reduce memory occupy (#18275)
---
 .../merge/ddl/fetch/FetchOrderByValueGroup.java    |  31 ++---
 ...der.java => FetchOrderByValueGroupsHolder.java} |  33 +++--
 .../merge/ddl/fetch/FetchStreamMergedResult.java   |  67 ++++++++--
 .../impl/FetchDirectionTokenGenerator.java         |  54 ++++++++
 .../rewrite/token/pojo/FetchDirectionToken.java    |  65 ++++++++++
 .../token/pojo/ShardingTokenGenerateBuilder.java   |   2 +
 .../fetch/FetchOrderByValueQueuesHolderTest.java   |  22 ++--
 .../ddl/fetch/FetchStreamMergedResultTest.java     |   1 +
 .../core/connection/ShardingSphereConnection.java  |   6 +-
 .../transaction/JDBCBackendTransactionManager.java |   6 +-
 .../impl/SchemaAssignedDatabaseBackendHandler.java |   6 +-
 .../text/transaction/TransactionXAHandler.java     |   4 +-
 .../main/antlr4/imports/opengauss/DDLStatement.g4  |   2 +-
 .../parser/sql/common/constant/DirectionType.java  |  61 ++++++++-
 .../resources/scenario/sharding/case/fetch.xml     | 144 ++++++++++++++++++++-
 15 files changed, 431 insertions(+), 73 deletions(-)

diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroup.java
similarity index 51%
copy from shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
copy to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroup.java
index c97d418149a..fe39b3cfc5c 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroup.java
@@ -15,28 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sql.parser.sql.common.constant;
+package org.apache.shardingsphere.sharding.merge.ddl.fetch;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+
+import java.util.Collection;
+import java.util.LinkedList;
 
 /**
- * Direction type enum.
+ * Fetch order by value group.
  */
-public enum DirectionType {
-    
-    NEXT, PRIOR, FIRST, LAST, ABSOLUTE_COUNT, RELATIVE_COUNT, COUNT, ALL, FORWARD, FORWARD_COUNT, FORWARD_ALL, BACKWARD, BACKWARD_COUNT, BACKWARD_ALL;
-    
-    private static final Set<DirectionType> ALL_DIRECTION_TYPES = new HashSet<>(Arrays.asList(ALL, FORWARD_ALL, BACKWARD_ALL));
+@Getter
+@Setter
+public final class FetchOrderByValueGroup {
     
-    /**
-     * Is all direction type.
-     * 
-     * @param directionType direction type
-     * @return is all direction type or not
-     */
-    public static boolean isAllDirectionType(final DirectionType directionType) {
-        return ALL_DIRECTION_TYPES.contains(directionType);
-    }
+    private final Collection<OrderByValue> orderByValues = new LinkedList<>();
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroupsHolder.java
similarity index 55%
rename from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
rename to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroupsHolder.java
index 3e005f70aa4..450a5f0125e 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueGroupsHolder.java
@@ -19,45 +19,44 @@ package org.apache.shardingsphere.sharding.merge.ddl.fetch;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Hold fetch order by value queues for current thread.
+ * Hold fetch order by value groups for current thread.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class FetchOrderByValueQueuesHolder {
+public final class FetchOrderByValueGroupsHolder {
     
-    private static final ThreadLocal<Map<String, Queue<OrderByValue>>> ORDER_BY_VALUE_QUEUES = ThreadLocal.withInitial(ConcurrentHashMap::new);
+    private static final ThreadLocal<Map<String, List<FetchOrderByValueGroup>>> ORDER_BY_VALUE_GROUPS = ThreadLocal.withInitial(ConcurrentHashMap::new);
     
-    private static final ThreadLocal<Map<String, Long>> REMAINING_ROW_COUNTS = ThreadLocal.withInitial(ConcurrentHashMap::new);
+    private static final ThreadLocal<Map<String, Long>> MIN_GROUP_ROW_COUNTS = ThreadLocal.withInitial(ConcurrentHashMap::new);
     
     /**
-     * Get order by value queues.
+     * Get order by value groups.
      *
-     * @return order by value queues
+     * @return order by value groups
      */
-    public static Map<String, Queue<OrderByValue>> getOrderByValueQueues() {
-        return ORDER_BY_VALUE_QUEUES.get();
+    public static Map<String, List<FetchOrderByValueGroup>> getOrderByValueGroups() {
+        return ORDER_BY_VALUE_GROUPS.get();
     }
     
     /**
-     * Get remaining row counts.
+     * Get min group row counts.
      *
-     * @return remaining row counts
+     * @return min group row counts
      */
-    public static Map<String, Long> getRemainingRowCounts() {
-        return REMAINING_ROW_COUNTS.get();
+    public static Map<String, Long> getMinGroupRowCounts() {
+        return MIN_GROUP_ROW_COUNTS.get();
     }
     
     /**
-     * Remove fetch order by value queues.
+     * Remove.
      */
     public static void remove() {
-        ORDER_BY_VALUE_QUEUES.remove();
-        REMAINING_ROW_COUNTS.remove();
+        ORDER_BY_VALUE_GROUPS.remove();
+        MIN_GROUP_ROW_COUNTS.remove();
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 47be48686db..8997f2579f2 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -31,7 +31,10 @@ import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -50,13 +53,14 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
     private boolean isFirstNext;
     
     public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) throws SQLException {
-        String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
-        orderByValuesQueue = FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
+        orderByValuesQueue = new PriorityQueue<>(queryResults.size());
         directionType = fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
         fetchCount = fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
         SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
-        addOrderedResultSetsToQueue(queryResults, selectStatementContext, schema);
-        mergeRemainingRowCount(cursorName);
+        String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
+        List<FetchOrderByValueGroup> fetchOrderByValueGroups = getFetchOrderByValueGroups(queryResults, selectStatementContext, schema, cursorName);
+        addOrderedResultSetsToQueue(fetchOrderByValueGroups, queryResults);
+        setMinResultSetRowCount(cursorName);
         isFirstNext = true;
     }
     
@@ -81,12 +85,43 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         return DirectionType.isAllDirectionType(directionType) || fetchCount-- > 0;
     }
     
-    private void addOrderedResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
+    private List<FetchOrderByValueGroup> getFetchOrderByValueGroups(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
+                                                                    final ShardingSphereSchema schema, final String cursorName) throws SQLException {
+        long actualFetchCount = Math.max(fetchCount - FetchOrderByValueGroupsHolder.getMinGroupRowCounts().getOrDefault(cursorName, 0L), 0);
+        List<FetchOrderByValueGroup> result = FetchOrderByValueGroupsHolder.getOrderByValueGroups().computeIfAbsent(cursorName, key -> createFetchOrderByValueGroups(queryResults.size()));
+        result.forEach(each -> each.getOrderByValues().removeIf(this::isEmptyOrderByValue));
+        if (actualFetchCount <= 0 && !DirectionType.isAllDirectionType(directionType)) {
+            return result;
+        }
         Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
+        int index = 0;
         for (QueryResult each : queryResults) {
             QueryResult queryResult = decorate(each, selectStatementContext.getDatabaseType());
             OrderByValue orderByValue = new OrderByValue(queryResult, items, selectStatementContext, schema);
             if (orderByValue.next()) {
+                result.get(index).getOrderByValues().add(orderByValue);
+            }
+            index++;
+        }
+        return result;
+    }
+    
+    private List<FetchOrderByValueGroup> createFetchOrderByValueGroups(final int queryResultSize) {
+        List<FetchOrderByValueGroup> result = new ArrayList<>();
+        for (int index = 0; index < queryResultSize; index++) {
+            result.add(new FetchOrderByValueGroup());
+        }
+        return result;
+    }
+    
+    private boolean isEmptyOrderByValue(final OrderByValue orderByValue) {
+        return orderByValue.getQueryResult() instanceof JDBCMemoryQueryResult
+                && 0 == ((JDBCMemoryQueryResult) orderByValue.getQueryResult()).getRowCount() && ((JDBCMemoryQueryResult) orderByValue.getQueryResult()).wasNull();
+    }
+    
+    private void addOrderedResultSetsToQueue(final List<FetchOrderByValueGroup> fetchOrderByValueGroups, final List<QueryResult> queryResults) {
+        for (FetchOrderByValueGroup each : fetchOrderByValueGroups) {
+            for (OrderByValue orderByValue : each.getOrderByValues()) {
                 orderByValuesQueue.offer(orderByValue);
             }
         }
@@ -100,14 +135,24 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         return queryResult;
     }
     
-    private void mergeRemainingRowCount(final String cursorName) {
-        long remainingRowCount = 0L;
-        for (OrderByValue each : orderByValuesQueue) {
+    private void setMinResultSetRowCount(final String cursorName) {
+        Collection<Long> rowCounts = new LinkedList<>();
+        List<FetchOrderByValueGroup> fetchOrderByValueGroups = FetchOrderByValueGroupsHolder.getOrderByValueGroups().getOrDefault(cursorName, new LinkedList<>());
+        for (FetchOrderByValueGroup each : fetchOrderByValueGroups) {
+            rowCounts.add(getGroupRowCount(each));
+        }
+        long minResultSetRowCount = DirectionType.isAllDirectionType(directionType) ? 0 : Collections.min(rowCounts) - fetchCount;
+        FetchOrderByValueGroupsHolder.getMinGroupRowCounts().put(cursorName, Math.max(minResultSetRowCount, 0L));
+    }
+    
+    private long getGroupRowCount(final FetchOrderByValueGroup fetchOrderByValueGroup) {
+        long result = 0;
+        for (OrderByValue each : fetchOrderByValueGroup.getOrderByValues()) {
             if (each.getQueryResult() instanceof JDBCMemoryQueryResult) {
-                remainingRowCount += ((JDBCMemoryQueryResult) each.getQueryResult()).getRowCount();
+                JDBCMemoryQueryResult queryResult = (JDBCMemoryQueryResult) each.getQueryResult();
+                result += queryResult.wasNull() ? queryResult.getRowCount() : queryResult.getRowCount() + 1;
             }
         }
-        remainingRowCount = DirectionType.isAllDirectionType(directionType) ? 0 : remainingRowCount - fetchCount;
-        FetchOrderByValueQueuesHolder.getRemainingRowCounts().put(cursorName, Math.max(remainingRowCount, 0));
+        return result;
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/generator/impl/FetchDirectionTokenGenerator.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/generator/impl/FetchDirectionTokenGenerator.java
new file mode 100644
index 00000000000..659dcbd950b
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/generator/impl/FetchDirectionTokenGenerator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.rewrite.token.generator.impl;
+
+import com.google.common.base.Preconditions;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
+import org.apache.shardingsphere.infra.rewrite.sql.token.generator.OptionalSQLTokenGenerator;
+import org.apache.shardingsphere.infra.rewrite.sql.token.pojo.SQLToken;
+import org.apache.shardingsphere.sharding.rewrite.token.pojo.FetchDirectionToken;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.CursorNameSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussFetchStatement;
+
+/**
+ * Fetch direction token generator.
+ */
+@Setter
+public final class FetchDirectionTokenGenerator implements OptionalSQLTokenGenerator<SQLStatementContext<?>> {
+    
+    @Override
+    public boolean isGenerateSQLToken(final SQLStatementContext<?> sqlStatementContext) {
+        return sqlStatementContext instanceof FetchStatementContext;
+    }
+    
+    @Override
+    public SQLToken generateSQLToken(final SQLStatementContext<?> sqlStatementContext) {
+        Preconditions.checkArgument(sqlStatementContext instanceof FetchStatementContext, "SQLStatementContext must be instance of FetchStatementContext.");
+        OpenGaussFetchStatement fetchStatement = ((FetchStatementContext) sqlStatementContext).getSqlStatement();
+        CursorNameSegment cursorName = fetchStatement.getCursorName();
+        int startIndex = fetchStatement.getDirection().map(DirectionSegment::getStartIndex).orElseGet(() -> cursorName.getStartIndex() - 1);
+        int stopIndex = fetchStatement.getDirection().map(DirectionSegment::getStopIndex).orElseGet(() -> cursorName.getStartIndex() - 1);
+        DirectionType directionType = fetchStatement.getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
+        long fetchCount = fetchStatement.getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
+        return new FetchDirectionToken(startIndex, stopIndex, directionType, fetchCount, cursorName.getIdentifier().getValue().toLowerCase());
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/FetchDirectionToken.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/FetchDirectionToken.java
new file mode 100644
index 00000000000..af7b0758da6
--- /dev/null
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/FetchDirectionToken.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.rewrite.token.pojo;
+
+import lombok.Getter;
+import org.apache.shardingsphere.infra.rewrite.sql.token.pojo.SQLToken;
+import org.apache.shardingsphere.infra.rewrite.sql.token.pojo.Substitutable;
+import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+
+/**
+ * Fetch direction token.
+ */
+public final class FetchDirectionToken extends SQLToken implements Substitutable {
+    
+    @Getter
+    private final int stopIndex;
+    
+    private final DirectionType directionType;
+    
+    private final long fetchCount;
+    
+    private final String cursorName;
+    
+    public FetchDirectionToken(final int startIndex, final int stopIndex, final DirectionType directionType, final long fetchCount, final String cursorName) {
+        super(startIndex);
+        this.stopIndex = stopIndex;
+        this.directionType = directionType;
+        this.fetchCount = fetchCount;
+        this.cursorName = cursorName;
+    }
+    
+    @Override
+    public String toString() {
+        long actualFetchCount = Math.max(fetchCount - FetchOrderByValueGroupsHolder.getMinGroupRowCounts().getOrDefault(cursorName, 0L), 0);
+        if (DirectionType.isForwardCountDirectionType(directionType)) {
+            return " FORWARD " + actualFetchCount + " ";
+        }
+        if (DirectionType.isBackwardCountDirectionType(directionType)) {
+            return " BACKWARD " + actualFetchCount + " ";
+        }
+        if (DirectionType.ABSOLUTE_COUNT.equals(directionType)) {
+            return " ABSOLUTE " + actualFetchCount + " ";
+        }
+        if (DirectionType.RELATIVE_COUNT.equals(directionType)) {
+            return " RELATIVE " + actualFetchCount + " ";
+        }
+        return directionType.getName();
+    }
+}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/ShardingTokenGenerateBuilder.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/ShardingTokenGenerateBuilder.java
index e77ce24e244..afc3f152a7b 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/ShardingTokenGenerateBuilder.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/rewrite/token/pojo/ShardingTokenGenerateBuilder.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.Aggregati
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.ConstraintTokenGenerator;
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.CursorTokenGenerator;
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.DistinctProjectionPrefixTokenGenerator;
+import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.FetchDirectionTokenGenerator;
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.IndexTokenGenerator;
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.OffsetTokenGenerator;
 import org.apache.shardingsphere.sharding.rewrite.token.generator.impl.OrderByTokenGenerator;
@@ -77,6 +78,7 @@ public final class ShardingTokenGenerateBuilder implements SQLTokenGeneratorBuil
         addSQLTokenGenerator(result, new GeneratedKeyInsertValuesTokenGenerator());
         addSQLTokenGenerator(result, new ShardingRemoveTokenGenerator());
         addSQLTokenGenerator(result, new CursorTokenGenerator());
+        addSQLTokenGenerator(result, new FetchDirectionTokenGenerator());
         return result;
     }
     
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
index 177179f9224..b8e3001208e 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.sharding.merge.ddl.fetch;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.PriorityQueue;
+import java.util.LinkedList;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -29,19 +29,19 @@ public final class FetchOrderByValueQueuesHolderTest {
     
     @Before
     public void setUp() {
-        FetchOrderByValueQueuesHolder.remove();
+        FetchOrderByValueGroupsHolder.remove();
     }
     
     @Test
     public void assertFetchOrderByValueQueuesHolder() {
-        assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
-        assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
-        FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent("t_order_cursor", key -> new PriorityQueue<>());
-        FetchOrderByValueQueuesHolder.getRemainingRowCounts().put("t_order_cursor", 0L);
-        assertTrue(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
-        assertTrue(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
-        FetchOrderByValueQueuesHolder.remove();
-        assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
-        assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueGroupsHolder.getOrderByValueGroups().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueGroupsHolder.getMinGroupRowCounts().containsKey("t_order_cursor"));
+        FetchOrderByValueGroupsHolder.getOrderByValueGroups().computeIfAbsent("t_order_cursor", key -> new LinkedList<>());
+        FetchOrderByValueGroupsHolder.getMinGroupRowCounts().put("t_order_cursor", 0L);
+        assertTrue(FetchOrderByValueGroupsHolder.getOrderByValueGroups().containsKey("t_order_cursor"));
+        assertTrue(FetchOrderByValueGroupsHolder.getMinGroupRowCounts().containsKey("t_order_cursor"));
+        FetchOrderByValueGroupsHolder.remove();
+        assertFalse(FetchOrderByValueGroupsHolder.getOrderByValueGroups().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueGroupsHolder.getMinGroupRowCounts().containsKey("t_order_cursor"));
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
index db3b20a5b1a..69247256601 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResultTest.java
@@ -67,6 +67,7 @@ public final class FetchStreamMergedResultTest {
         resultMerger = new ShardingDDLResultMerger();
         database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
         when(database.getName()).thenReturn(DefaultDatabase.LOGIC_NAME);
+        FetchOrderByValueGroupsHolder.remove();
     }
     
     private OpenGaussFetchStatement createFetchStatement(final boolean containsAllDirectionType) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 7c4c464d89b..e28ba5215ea 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSp
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
+import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 import org.apache.shardingsphere.traffic.context.TrafficContextHolder;
 import org.apache.shardingsphere.transaction.TransactionHolder;
 
@@ -171,7 +171,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
             connectionManager.getConnectionTransaction().setRollbackOnly(false);
             TransactionHolder.clear();
             TrafficContextHolder.remove();
-            FetchOrderByValueQueuesHolder.remove();
+            FetchOrderByValueGroupsHolder.remove();
         }
     }
     
@@ -183,7 +183,7 @@ public final class ShardingSphereConnection extends AbstractConnectionAdapter {
             connectionManager.getConnectionTransaction().setRollbackOnly(false);
             TransactionHolder.clear();
             TrafficContextHolder.remove();
-            FetchOrderByValueQueuesHolder.remove();
+            FetchOrderByValueGroupsHolder.remove();
         }
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
index 9967dd0cbad..683f3b74590 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/JDBCBackendTransactionManager.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.proxy.backend.communication.TransactionManager;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
+import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 import org.apache.shardingsphere.transaction.ConnectionSavepointManager;
 import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -87,7 +87,7 @@ public final class JDBCBackendTransactionManager implements TransactionManager<V
                 connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
                 connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
                 TransactionHolder.clear();
-                FetchOrderByValueQueuesHolder.remove();
+                FetchOrderByValueGroupsHolder.remove();
             }
         }
         return null;
@@ -106,7 +106,7 @@ public final class JDBCBackendTransactionManager implements TransactionManager<V
                 connection.getConnectionSession().getTransactionStatus().setInTransaction(false);
                 connection.getConnectionSession().getTransactionStatus().setRollbackOnly(false);
                 TransactionHolder.clear();
-                FetchOrderByValueQueuesHolder.remove();
+                FetchOrderByValueGroupsHolder.remove();
             }
         }
         return null;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index 7fef747d3af..51c660334a1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -36,7 +36,7 @@ import org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.data.DatabaseBackendHandler;
-import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
+import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -101,8 +101,8 @@ public final class SchemaAssignedDatabaseBackendHandler implements DatabaseBacke
             ((CursorDefinitionAware) statementContext).setUpCursorDefinition(cursorStatementContext);
         }
         if (statementContext instanceof CloseStatementContext) {
-            FetchOrderByValueQueuesHolder.getOrderByValueQueues().remove(cursorName);
-            FetchOrderByValueQueuesHolder.getRemainingRowCounts().remove(cursorName);
+            FetchOrderByValueGroupsHolder.getOrderByValueGroups().remove(cursorName);
+            FetchOrderByValueGroupsHolder.getMinGroupRowCounts().remove(cursorName);
             connectionSession.getCursorDefinitions().remove(cursorName);
         }
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
index dea51959490..24472b886ea 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/transaction/TransactionXAHandler.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import org.apache.shardingsphere.proxy.backend.text.data.impl.SchemaAssignedDatabaseBackendHandler;
-import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueQueuesHolder;
+import org.apache.shardingsphere.sharding.merge.ddl.fetch.FetchOrderByValueGroupsHolder;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.XAStatement;
 import org.apache.shardingsphere.transaction.TransactionHolder;
@@ -88,7 +88,7 @@ public final class TransactionXAHandler implements TextProtocolBackendHandler {
                 } finally {
                     connectionSession.getTransactionStatus().setManualXA(false);
                     TransactionHolder.clear();
-                    FetchOrderByValueQueuesHolder.remove();
+                    FetchOrderByValueGroupsHolder.remove();
                 }
             default:
                 throw new SQLException("unrecognized XA statement " + tclStatement.getOp());
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-opengauss/src/main/antlr4/imports/opengauss/DDLStatement.g4 b/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-opengauss/src/main/antlr4/imports/opengauss/DDLStatement.g4
index cfde9422749..47bb8b140e2 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-opengauss/src/main/antlr4/imports/opengauss/DDLStatement.g4
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-dialect/shardingsphere-sql-parser-opengauss/src/main/antlr4/imports/opengauss/DDLStatement.g4
@@ -1972,7 +1972,7 @@ dropSchema
     ;
 
 fetch
-    : FETCH (direction (FROM | IN))? cursorName
+    : FETCH (direction (FROM | IN)?)? cursorName
     ;
 
 direction
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
index c97d418149a..9df3f72ad71 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
@@ -17,19 +17,58 @@
 
 package org.apache.shardingsphere.sql.parser.sql.common.constant;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
 /**
  * Direction type enum.
  */
+@RequiredArgsConstructor
+@Getter
 public enum DirectionType {
     
-    NEXT, PRIOR, FIRST, LAST, ABSOLUTE_COUNT, RELATIVE_COUNT, COUNT, ALL, FORWARD, FORWARD_COUNT, FORWARD_ALL, BACKWARD, BACKWARD_COUNT, BACKWARD_ALL;
+    NEXT("NEXT"),
+    
+    PRIOR("PRIOR"),
+    
+    FIRST("FIRST"),
+    
+    LAST("LAST"),
+    
+    ABSOLUTE_COUNT("ABSOLUTE"),
+    
+    RELATIVE_COUNT("RELATIVE"),
+    
+    COUNT(""),
+    
+    ALL("ALL"),
+    
+    FORWARD("FORWARD"),
+    
+    FORWARD_COUNT("FORWARD"),
+    
+    FORWARD_ALL("FORWARD ALL"),
+    
+    BACKWARD("BACKWARD"),
+    
+    BACKWARD_COUNT("BACKWARD"),
+    
+    BACKWARD_ALL("BACKWARD ALL");
     
     private static final Set<DirectionType> ALL_DIRECTION_TYPES = new HashSet<>(Arrays.asList(ALL, FORWARD_ALL, BACKWARD_ALL));
     
+    private static final Collection<DirectionType> FORWARD_COUNT_DIRECTION_TYPES =
+            new HashSet<>(Arrays.asList(DirectionType.NEXT, DirectionType.COUNT, DirectionType.FORWARD, DirectionType.FORWARD_COUNT));
+    
+    private static final Collection<DirectionType> BACKWARD_COUNT_DIRECTION_TYPES = new HashSet<>(Arrays.asList(DirectionType.PRIOR, DirectionType.BACKWARD, DirectionType.BACKWARD_COUNT));
+    
+    private final String name;
+    
     /**
      * Is all direction type.
      * 
@@ -39,4 +78,24 @@ public enum DirectionType {
     public static boolean isAllDirectionType(final DirectionType directionType) {
         return ALL_DIRECTION_TYPES.contains(directionType);
     }
+    
+    /**
+     * Is forward count direction type.
+     *
+     * @param directionType direction type
+     * @return is forward count direction type or not
+     */
+    public static boolean isForwardCountDirectionType(final DirectionType directionType) {
+        return FORWARD_COUNT_DIRECTION_TYPES.contains(directionType);
+    }
+    
+    /**
+     * Is backward count direction type.
+     *
+     * @param directionType direction type
+     * @return is backward count direction type or not
+     */
+    public static boolean isBackwardCountDirectionType(final DirectionType directionType) {
+        return BACKWARD_COUNT_DIRECTION_TYPES.contains(directionType);
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-rewrite-test/src/test/resources/scenario/sharding/case/fetch.xml b/shardingsphere-test/shardingsphere-rewrite-test/src/test/resources/scenario/sharding/case/fetch.xml
index 54a03d5729d..1418c62eac8 100644
--- a/shardingsphere-test/shardingsphere-rewrite-test/src/test/resources/scenario/sharding/case/fetch.xml
+++ b/shardingsphere-test/shardingsphere-rewrite-test/src/test/resources/scenario/sharding/case/fetch.xml
@@ -19,11 +19,151 @@
 <rewrite-assertions yaml-rule="scenario/sharding/config/sharding-rule.yaml">
     <rewrite-assertion id="fetch_cursor" db-types="openGauss">
         <input sql="FETCH t_account_cursor" />
-        <output sql="FETCH t_account_cursor_t_account_0" />
+        <output sql="FETCH FORWARD 1 t_account_cursor_t_account_0" />
     </rewrite-assertion>
 
     <rewrite-assertion id="fetch_cursor_with_qualified_name" db-types="openGauss">
         <input sql="FETCH &quot;t_account_cursor&quot;" />
-        <output sql="FETCH &quot;t_account_cursor_t_account_0&quot;" />
+        <output sql="FETCH FORWARD 1 &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_next" db-types="openGauss">
+        <input sql="FETCH NEXT FROM t_account_cursor" />
+        <output sql="FETCH  FORWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_next_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH NEXT FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  FORWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_prior" db-types="openGauss">
+        <input sql="FETCH PRIOR FROM t_account_cursor" />
+        <output sql="FETCH  BACKWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_prior_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH PRIOR FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  BACKWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_first" db-types="openGauss">
+        <input sql="FETCH FIRST FROM t_account_cursor" />
+        <output sql="FETCH FIRST FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_first_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH FIRST FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH FIRST FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_last" db-types="openGauss">
+        <input sql="FETCH LAST FROM t_account_cursor" />
+        <output sql="FETCH LAST FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_last_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH LAST FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH LAST FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_absolute_count" db-types="openGauss">
+        <input sql="FETCH ABSOLUTE 10 FROM t_account_cursor" />
+        <output sql="FETCH  ABSOLUTE 10  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_absolute_count_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH ABSOLUTE 10 FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  ABSOLUTE 10  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_relative_count" db-types="openGauss">
+        <input sql="FETCH RELATIVE 10 FROM t_account_cursor" />
+        <output sql="FETCH  RELATIVE 10  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_relative_count_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH RELATIVE 10 FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  RELATIVE 10  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_count" db-types="openGauss">
+        <input sql="FETCH 10 FROM t_account_cursor" />
+        <output sql="FETCH  FORWARD 10  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_count_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH 10 FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  FORWARD 10  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_all" db-types="openGauss">
+        <input sql="FETCH ALL FROM t_account_cursor" />
+        <output sql="FETCH ALL FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_all_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH ALL FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH ALL FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward" db-types="openGauss">
+        <input sql="FETCH FORWARD FROM t_account_cursor" />
+        <output sql="FETCH  FORWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH FORWARD FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  FORWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward_count" db-types="openGauss">
+        <input sql="FETCH FORWARD 1 FROM t_account_cursor" />
+        <output sql="FETCH  FORWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward_count_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH FORWARD 1 FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  FORWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward_all" db-types="openGauss">
+        <input sql="FETCH FORWARD ALL FROM t_account_cursor" />
+        <output sql="FETCH FORWARD ALL FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_forward_all_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH FORWARD ALL FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH FORWARD ALL FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward" db-types="openGauss">
+        <input sql="FETCH BACKWARD FROM t_account_cursor" />
+        <output sql="FETCH  BACKWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH BACKWARD FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  BACKWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward_count" db-types="openGauss">
+        <input sql="FETCH BACKWARD 1 FROM t_account_cursor" />
+        <output sql="FETCH  BACKWARD 1  FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward_count_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH BACKWARD 1 FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH  BACKWARD 1  FROM &quot;t_account_cursor_t_account_0&quot;" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward_all" db-types="openGauss">
+        <input sql="FETCH BACKWARD ALL FROM t_account_cursor" />
+        <output sql="FETCH BACKWARD ALL FROM t_account_cursor_t_account_0" />
+    </rewrite-assertion>
+
+    <rewrite-assertion id="fetch_cursor_with_backward_all_with_qualified_name" db-types="openGauss">
+        <input sql="FETCH BACKWARD ALL FROM &quot;t_account_cursor&quot;" />
+        <output sql="FETCH BACKWARD ALL FROM &quot;t_account_cursor_t_account_0&quot;" />
     </rewrite-assertion>
 </rewrite-assertions>