You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/11 05:40:23 UTC

[shardingsphere] branch master updated: Fix fetch forward all error in openGauss(#21421) (#21471)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bcde6f374c4 Fix fetch forward all error in openGauss(#21421) (#21471)
bcde6f374c4 is described below

commit bcde6f374c4a3a025173fbc9f6d0e66ed686a042
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Tue Oct 11 13:40:15 2022 +0800

    Fix fetch forward all error in openGauss(#21421) (#21471)
    
    * Fix fetch forward all error in openGauss(#21421)
    
    * Fix fetch all
    
    * Fix
---
 .../merge/ddl/fetch/FetchStreamMergedResult.java      | 19 +++++++++++++++++++
 .../infra/context/cursor/CursorConnectionContext.java |  4 ++++
 2 files changed, 23 insertions(+)

diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 4c9db958ace..2edf9fe5a62 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -55,6 +55,8 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
     
     private boolean isFirstNext;
     
+    private boolean isExecutedAllDirection;
+    
     public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext,
                                    final ShardingSphereSchema schema, final ConnectionContext connectionContext) throws SQLException {
         orderByValuesQueue = new PriorityQueue<>(queryResults.size());
@@ -65,11 +67,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         List<FetchOrderByValueGroup> fetchOrderByValueGroups = getFetchOrderByValueGroups(queryResults, selectStatementContext, schema, cursorName, connectionContext);
         addOrderedResultSetsToQueue(fetchOrderByValueGroups, queryResults);
         setMinResultSetRowCount(cursorName, connectionContext);
+        handleExecutedAllDirections(connectionContext, cursorName);
         isFirstNext = true;
     }
     
     @Override
     public boolean next() throws SQLException {
+        if (isExecutedAllDirection) {
+            return false;
+        }
         if (orderByValuesQueue.isEmpty()) {
             return false;
         }
@@ -101,6 +107,10 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         if (actualFetchCount <= 0 && !DirectionType.isAllDirectionType(directionType)) {
             return result;
         }
+        if (connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName)) {
+            result.forEach(each -> each.getOrderByValues().clear());
+            return result;
+        }
         Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
         int index = 0;
         for (QueryResult each : queryResults) {
@@ -153,6 +163,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         connectionContext.getCursorConnectionContext().getMinGroupRowCounts().put(cursorName, Math.max(minResultSetRowCount, 0L));
     }
     
+    private void handleExecutedAllDirections(final ConnectionContext connectionContext, final String cursorName) {
+        if (connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName)) {
+            isExecutedAllDirection = true;
+        }
+        if (DirectionType.isAllDirectionType(directionType)) {
+            connectionContext.getCursorConnectionContext().getExecutedAllDirections().put(cursorName, true);
+        }
+    }
+    
     private long getGroupRowCount(final FetchOrderByValueGroup fetchOrderByValueGroup) {
         long result = 0;
         for (OrderByValue each : fetchOrderByValueGroup.getOrderByValues()) {
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
index d4204937008..e3a0870d46f 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
@@ -35,11 +35,14 @@ public final class CursorConnectionContext implements AutoCloseable {
     
     private final Map<String, CursorDefinition> cursorDefinitions = new ConcurrentHashMap<>();
     
+    private final Map<String, Boolean> executedAllDirections = new ConcurrentHashMap<>();
+    
     @Override
     public void close() {
         orderByValueGroups.clear();
         minGroupRowCounts.clear();
         cursorDefinitions.clear();
+        executedAllDirections.clear();
     }
     
     /**
@@ -51,5 +54,6 @@ public final class CursorConnectionContext implements AutoCloseable {
         orderByValueGroups.remove(name);
         minGroupRowCounts.remove(name);
         cursorDefinitions.remove(name);
+        executedAllDirections.remove(name);
     }
 }