You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/10/11 05:40:23 UTC
[shardingsphere] branch master updated: Fix fetch forward all error in openGauss(#21421) (#21471)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bcde6f374c4 Fix fetch forward all error in openGauss(#21421) (#21471)
bcde6f374c4 is described below
commit bcde6f374c4a3a025173fbc9f6d0e66ed686a042
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Tue Oct 11 13:40:15 2022 +0800
Fix fetch forward all error in openGauss(#21421) (#21471)
* Fix fetch forward all error in openGauss(#21421)
* Fix fetch all
* Fix
---
.../merge/ddl/fetch/FetchStreamMergedResult.java | 19 +++++++++++++++++++
.../infra/context/cursor/CursorConnectionContext.java | 4 ++++
2 files changed, 23 insertions(+)
diff --git a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 4c9db958ace..2edf9fe5a62 100644
--- a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -55,6 +55,8 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
private boolean isFirstNext;
+ private boolean isExecutedAllDirection;
+
public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext,
final ShardingSphereSchema schema, final ConnectionContext connectionContext) throws SQLException {
orderByValuesQueue = new PriorityQueue<>(queryResults.size());
@@ -65,11 +67,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
List<FetchOrderByValueGroup> fetchOrderByValueGroups = getFetchOrderByValueGroups(queryResults, selectStatementContext, schema, cursorName, connectionContext);
addOrderedResultSetsToQueue(fetchOrderByValueGroups, queryResults);
setMinResultSetRowCount(cursorName, connectionContext);
+ handleExecutedAllDirections(connectionContext, cursorName);
isFirstNext = true;
}
@Override
public boolean next() throws SQLException {
+ if (isExecutedAllDirection) {
+ return false;
+ }
if (orderByValuesQueue.isEmpty()) {
return false;
}
@@ -101,6 +107,10 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
if (actualFetchCount <= 0 && !DirectionType.isAllDirectionType(directionType)) {
return result;
}
+ if (connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName)) {
+ result.forEach(each -> each.getOrderByValues().clear());
+ return result;
+ }
Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
int index = 0;
for (QueryResult each : queryResults) {
@@ -153,6 +163,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
connectionContext.getCursorConnectionContext().getMinGroupRowCounts().put(cursorName, Math.max(minResultSetRowCount, 0L));
}
+ private void handleExecutedAllDirections(final ConnectionContext connectionContext, final String cursorName) {
+ if (connectionContext.getCursorConnectionContext().getExecutedAllDirections().containsKey(cursorName)) {
+ isExecutedAllDirection = true;
+ }
+ if (DirectionType.isAllDirectionType(directionType)) {
+ connectionContext.getCursorConnectionContext().getExecutedAllDirections().put(cursorName, true);
+ }
+ }
+
private long getGroupRowCount(final FetchOrderByValueGroup fetchOrderByValueGroup) {
long result = 0;
for (OrderByValue each : fetchOrderByValueGroup.getOrderByValues()) {
diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
index d4204937008..e3a0870d46f 100644
--- a/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
+++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/context/cursor/CursorConnectionContext.java
@@ -35,11 +35,14 @@ public final class CursorConnectionContext implements AutoCloseable {
private final Map<String, CursorDefinition> cursorDefinitions = new ConcurrentHashMap<>();
+ private final Map<String, Boolean> executedAllDirections = new ConcurrentHashMap<>();
+
@Override
public void close() {
orderByValueGroups.clear();
minGroupRowCounts.clear();
cursorDefinitions.clear();
+ executedAllDirections.clear();
}
/**
@@ -51,5 +54,6 @@ public final class CursorConnectionContext implements AutoCloseable {
orderByValueGroups.remove(name);
minGroupRowCounts.remove(name);
cursorDefinitions.remove(name);
+ executedAllDirections.remove(name);
}
}