You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/06/09 04:09:50 UTC
[shardingsphere] branch master updated: Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1c7733837d7 Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)
1c7733837d7 is described below
commit 1c7733837d7e45009750c27a5c41db6da7d4d52c
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Thu Jun 9 12:09:43 2022 +0800
Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)
---
.../ddl/fetch/FetchOrderByValueQueuesHolder.java | 18 +++++--
.../merge/ddl/fetch/FetchStreamMergedResult.java | 56 +++++++++++++---------
.../fetch/FetchOrderByValueQueuesHolderTest.java | 14 ++++--
.../jdbc/type/memory/JDBCMemoryQueryResult.java | 2 +-
.../query/impl/raw/type/RawMemoryQueryResult.java | 2 +-
.../type/memory/AbstractMemoryQueryResult.java | 14 ++++--
.../type/memory/JDBCMemoryQueryResultTest.java | 10 ++++
.../impl/SchemaAssignedDatabaseBackendHandler.java | 3 +-
8 files changed, 83 insertions(+), 36 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
index f06927506cf..3e005f70aa4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
@@ -33,19 +33,31 @@ public final class FetchOrderByValueQueuesHolder {
private static final ThreadLocal<Map<String, Queue<OrderByValue>>> ORDER_BY_VALUE_QUEUES = ThreadLocal.withInitial(ConcurrentHashMap::new);
+ private static final ThreadLocal<Map<String, Long>> REMAINING_ROW_COUNTS = ThreadLocal.withInitial(ConcurrentHashMap::new);
+
/**
- * Get fetch order by value queues.
+ * Get order by value queues.
*
- * @return fetch order by value queues
+ * @return order by value queues
*/
- public static Map<String, Queue<OrderByValue>> get() {
+ public static Map<String, Queue<OrderByValue>> getOrderByValueQueues() {
return ORDER_BY_VALUE_QUEUES.get();
}
+ /**
+ * Get remaining row counts.
+ *
+ * @return remaining row counts
+ */
+ public static Map<String, Long> getRemainingRowCounts() {
+ return REMAINING_ROW_COUNTS.get();
+ }
+
/**
* Remove fetch order by value queues.
*/
public static void remove() {
ORDER_BY_VALUE_QUEUES.remove();
+ REMAINING_ROW_COUNTS.remove();
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index c59422552f0..47be48686db 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -51,33 +51,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) throws SQLException {
String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
- SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
- orderByValuesQueue = FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
- orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+ orderByValuesQueue = FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
directionType = fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
fetchCount = fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
+ SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
+ addOrderedResultSetsToQueue(queryResults, selectStatementContext, schema);
+ mergeRemainingRowCount(cursorName);
isFirstNext = true;
}
- private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
- Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
- for (QueryResult each : queryResults) {
- QueryResult queryResult = decorate(each, selectStatementContext.getDatabaseType());
- OrderByValue orderByValue = new OrderByValue(queryResult, items, selectStatementContext, schema);
- if (orderByValue.next()) {
- orderByValuesQueue.offer(orderByValue);
- }
- }
- setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
- }
-
- private QueryResult decorate(final QueryResult queryResult, final DatabaseType databaseType) throws SQLException {
- if (!DirectionType.isAllDirectionType(directionType) && queryResult instanceof JDBCStreamQueryResult) {
- return new JDBCMemoryQueryResult(((JDBCStreamQueryResult) queryResult).getResultSet(), databaseType);
- }
- return queryResult;
- }
-
@Override
public boolean next() throws SQLException {
if (orderByValuesQueue.isEmpty()) {
@@ -98,4 +80,34 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
return DirectionType.isAllDirectionType(directionType) || fetchCount-- > 0;
}
+
+ private void addOrderedResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
+ Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
+ for (QueryResult each : queryResults) {
+ QueryResult queryResult = decorate(each, selectStatementContext.getDatabaseType());
+ OrderByValue orderByValue = new OrderByValue(queryResult, items, selectStatementContext, schema);
+ if (orderByValue.next()) {
+ orderByValuesQueue.offer(orderByValue);
+ }
+ }
+ setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
+ }
+
+ private QueryResult decorate(final QueryResult queryResult, final DatabaseType databaseType) throws SQLException {
+ if (!DirectionType.isAllDirectionType(directionType) && queryResult instanceof JDBCStreamQueryResult) {
+ return new JDBCMemoryQueryResult(((JDBCStreamQueryResult) queryResult).getResultSet(), databaseType);
+ }
+ return queryResult;
+ }
+
+ private void mergeRemainingRowCount(final String cursorName) {
+ long remainingRowCount = 0L;
+ for (OrderByValue each : orderByValuesQueue) {
+ if (each.getQueryResult() instanceof JDBCMemoryQueryResult) {
+ remainingRowCount += ((JDBCMemoryQueryResult) each.getQueryResult()).getRowCount();
+ }
+ }
+ remainingRowCount = DirectionType.isAllDirectionType(directionType) ? 0 : remainingRowCount - fetchCount;
+ FetchOrderByValueQueuesHolder.getRemainingRowCounts().put(cursorName, Math.max(remainingRowCount, 0));
+ }
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
index b31d48f009e..177179f9224 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
@@ -33,11 +33,15 @@ public final class FetchOrderByValueQueuesHolderTest {
}
@Test
- public void assertTrafficContextHolder() {
- assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
- FetchOrderByValueQueuesHolder.get().computeIfAbsent("t_order_cursor", key -> new PriorityQueue<>());
- assertTrue(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+ public void assertFetchOrderByValueQueuesHolder() {
+ assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+ assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
+ FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent("t_order_cursor", key -> new PriorityQueue<>());
+ FetchOrderByValueQueuesHolder.getRemainingRowCounts().put("t_order_cursor", 0L);
+ assertTrue(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+ assertTrue(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
FetchOrderByValueQueuesHolder.remove();
- assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+ assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+ assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
index f6024989569..043b6c79ce3 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
@@ -31,6 +31,6 @@ import java.sql.SQLException;
public final class JDBCMemoryQueryResult extends AbstractMemoryQueryResult {
public JDBCMemoryQueryResult(final ResultSet resultSet, final DatabaseType databaseType) throws SQLException {
- super(new JDBCQueryResultMetaData(resultSet.getMetaData()), DialectJDBCRowsLoaderFactory.getInstance(databaseType).load(resultSet.getMetaData().getColumnCount(), resultSet).iterator());
+ super(new JDBCQueryResultMetaData(resultSet.getMetaData()), DialectJDBCRowsLoaderFactory.getInstance(databaseType).load(resultSet.getMetaData().getColumnCount(), resultSet));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
index 67d542d5ca9..7b06e2bcf43 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
@@ -29,6 +29,6 @@ import java.util.List;
public final class RawMemoryQueryResult extends AbstractMemoryQueryResult {
public RawMemoryQueryResult(final QueryResultMetaData metaData, final List<MemoryQueryResultDataRow> rows) {
- super(metaData, rows.iterator());
+ super(metaData, rows);
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
index 10ed511ab93..500c1ba5846 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory;
-import lombok.AccessLevel;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
@@ -31,12 +29,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.util.Calendar;
+import java.util.Collection;
import java.util.Iterator;
/**
* Abstract memory query result.
*/
-@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public abstract class AbstractMemoryQueryResult implements QueryResult {
@Getter
@@ -46,10 +44,20 @@ public abstract class AbstractMemoryQueryResult implements QueryResult {
private MemoryQueryResultDataRow currentRow;
+ @Getter
+ private long rowCount;
+
+ protected AbstractMemoryQueryResult(final QueryResultMetaData metaData, final Collection<MemoryQueryResultDataRow> rows) {
+ this.metaData = metaData;
+ this.rows = rows.iterator();
+ rowCount = rows.size();
+ }
+
@Override
public final boolean next() {
if (rows.hasNext()) {
currentRow = rows.next();
+ rowCount--;
return true;
}
currentRow = null;
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
index fa89a901068..afad05bcf5c 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
@@ -364,6 +364,16 @@ public final class JDBCMemoryQueryResultTest {
assertTrue(queryResult.wasNull());
}
+ @Test
+ public void assertGetRowCount() throws SQLException {
+ JDBCMemoryQueryResult queryResult = new JDBCMemoryQueryResult(mockResultSet(), databaseType);
+ assertThat(queryResult.getRowCount(), is(1L));
+ queryResult.next();
+ assertThat(queryResult.getRowCount(), is(0L));
+ queryResult.next();
+ assertThat(queryResult.getRowCount(), is(0L));
+ }
+
private ResultSet mockResultSet() throws SQLException {
ResultSet result = mock(ResultSet.class);
when(result.next()).thenReturn(true).thenReturn(false);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index 051dcfdb6f9..7fef747d3af 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -101,7 +101,8 @@ public final class SchemaAssignedDatabaseBackendHandler implements DatabaseBacke
((CursorDefinitionAware) statementContext).setUpCursorDefinition(cursorStatementContext);
}
if (statementContext instanceof CloseStatementContext) {
- FetchOrderByValueQueuesHolder.get().remove(cursorName);
+ FetchOrderByValueQueuesHolder.getOrderByValueQueues().remove(cursorName);
+ FetchOrderByValueQueuesHolder.getRemainingRowCounts().remove(cursorName);
connectionSession.getCursorDefinitions().remove(cursorName);
}
}