You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/06/09 04:09:50 UTC

[shardingsphere] branch master updated: Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c7733837d7 Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)
1c7733837d7 is described below

commit 1c7733837d7e45009750c27a5c41db6da7d4d52c
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Thu Jun 9 12:09:43 2022 +0800

    Add remainingRowCount for fetch statement to reduce result set hold in memory (#18254)
---
 .../ddl/fetch/FetchOrderByValueQueuesHolder.java   | 18 +++++--
 .../merge/ddl/fetch/FetchStreamMergedResult.java   | 56 +++++++++++++---------
 .../fetch/FetchOrderByValueQueuesHolderTest.java   | 14 ++++--
 .../jdbc/type/memory/JDBCMemoryQueryResult.java    |  2 +-
 .../query/impl/raw/type/RawMemoryQueryResult.java  |  2 +-
 .../type/memory/AbstractMemoryQueryResult.java     | 14 ++++--
 .../type/memory/JDBCMemoryQueryResultTest.java     | 10 ++++
 .../impl/SchemaAssignedDatabaseBackendHandler.java |  3 +-
 8 files changed, 83 insertions(+), 36 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
index f06927506cf..3e005f70aa4 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolder.java
@@ -33,19 +33,31 @@ public final class FetchOrderByValueQueuesHolder {
     
     private static final ThreadLocal<Map<String, Queue<OrderByValue>>> ORDER_BY_VALUE_QUEUES = ThreadLocal.withInitial(ConcurrentHashMap::new);
     
+    private static final ThreadLocal<Map<String, Long>> REMAINING_ROW_COUNTS = ThreadLocal.withInitial(ConcurrentHashMap::new);
+    
     /**
-     * Get fetch order by value queues.
+     * Get order by value queues.
      *
-     * @return fetch order by value queues
+     * @return order by value queues
      */
-    public static Map<String, Queue<OrderByValue>> get() {
+    public static Map<String, Queue<OrderByValue>> getOrderByValueQueues() {
         return ORDER_BY_VALUE_QUEUES.get();
     }
     
+    /**
+     * Get remaining row counts.
+     *
+     * @return remaining row counts
+     */
+    public static Map<String, Long> getRemainingRowCounts() {
+        return REMAINING_ROW_COUNTS.get();
+    }
+    
     /**
      * Remove fetch order by value queues.
      */
     public static void remove() {
         ORDER_BY_VALUE_QUEUES.remove();
+        REMAINING_ROW_COUNTS.remove();
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index c59422552f0..47be48686db 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -51,33 +51,15 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
     
     public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) throws SQLException {
         String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
-        SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
-        orderByValuesQueue = FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
-        orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+        orderByValuesQueue = FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
         directionType = fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
         fetchCount = fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
+        SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
+        addOrderedResultSetsToQueue(queryResults, selectStatementContext, schema);
+        mergeRemainingRowCount(cursorName);
         isFirstNext = true;
     }
     
-    private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
-        Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
-        for (QueryResult each : queryResults) {
-            QueryResult queryResult = decorate(each, selectStatementContext.getDatabaseType());
-            OrderByValue orderByValue = new OrderByValue(queryResult, items, selectStatementContext, schema);
-            if (orderByValue.next()) {
-                orderByValuesQueue.offer(orderByValue);
-            }
-        }
-        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
-    }
-    
-    private QueryResult decorate(final QueryResult queryResult, final DatabaseType databaseType) throws SQLException {
-        if (!DirectionType.isAllDirectionType(directionType) && queryResult instanceof JDBCStreamQueryResult) {
-            return new JDBCMemoryQueryResult(((JDBCStreamQueryResult) queryResult).getResultSet(), databaseType);
-        }
-        return queryResult;
-    }
-    
     @Override
     public boolean next() throws SQLException {
         if (orderByValuesQueue.isEmpty()) {
@@ -98,4 +80,34 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
         return DirectionType.isAllDirectionType(directionType) || fetchCount-- > 0;
     }
+    
+    private void addOrderedResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
+        Collection<OrderByItem> items = selectStatementContext.getOrderByContext().getItems();
+        for (QueryResult each : queryResults) {
+            QueryResult queryResult = decorate(each, selectStatementContext.getDatabaseType());
+            OrderByValue orderByValue = new OrderByValue(queryResult, items, selectStatementContext, schema);
+            if (orderByValue.next()) {
+                orderByValuesQueue.offer(orderByValue);
+            }
+        }
+        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
+    }
+    
+    private QueryResult decorate(final QueryResult queryResult, final DatabaseType databaseType) throws SQLException {
+        if (!DirectionType.isAllDirectionType(directionType) && queryResult instanceof JDBCStreamQueryResult) {
+            return new JDBCMemoryQueryResult(((JDBCStreamQueryResult) queryResult).getResultSet(), databaseType);
+        }
+        return queryResult;
+    }
+    
+    private void mergeRemainingRowCount(final String cursorName) {
+        long remainingRowCount = 0L;
+        for (OrderByValue each : orderByValuesQueue) {
+            if (each.getQueryResult() instanceof JDBCMemoryQueryResult) {
+                remainingRowCount += ((JDBCMemoryQueryResult) each.getQueryResult()).getRowCount();
+            }
+        }
+        remainingRowCount = DirectionType.isAllDirectionType(directionType) ? 0 : remainingRowCount - fetchCount;
+        FetchOrderByValueQueuesHolder.getRemainingRowCounts().put(cursorName, Math.max(remainingRowCount, 0));
+    }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
index b31d48f009e..177179f9224 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchOrderByValueQueuesHolderTest.java
@@ -33,11 +33,15 @@ public final class FetchOrderByValueQueuesHolderTest {
     }
     
     @Test
-    public void assertTrafficContextHolder() {
-        assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
-        FetchOrderByValueQueuesHolder.get().computeIfAbsent("t_order_cursor", key -> new PriorityQueue<>());
-        assertTrue(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+    public void assertFetchOrderByValueQueuesHolder() {
+        assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
+        FetchOrderByValueQueuesHolder.getOrderByValueQueues().computeIfAbsent("t_order_cursor", key -> new PriorityQueue<>());
+        FetchOrderByValueQueuesHolder.getRemainingRowCounts().put("t_order_cursor", 0L);
+        assertTrue(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+        assertTrue(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
         FetchOrderByValueQueuesHolder.remove();
-        assertFalse(FetchOrderByValueQueuesHolder.get().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueQueuesHolder.getOrderByValueQueues().containsKey("t_order_cursor"));
+        assertFalse(FetchOrderByValueQueuesHolder.getRemainingRowCounts().containsKey("t_order_cursor"));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
index f6024989569..043b6c79ce3 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResult.java
@@ -31,6 +31,6 @@ import java.sql.SQLException;
 public final class JDBCMemoryQueryResult extends AbstractMemoryQueryResult {
     
     public JDBCMemoryQueryResult(final ResultSet resultSet, final DatabaseType databaseType) throws SQLException {
-        super(new JDBCQueryResultMetaData(resultSet.getMetaData()), DialectJDBCRowsLoaderFactory.getInstance(databaseType).load(resultSet.getMetaData().getColumnCount(), resultSet).iterator());
+        super(new JDBCQueryResultMetaData(resultSet.getMetaData()), DialectJDBCRowsLoaderFactory.getInstance(databaseType).load(resultSet.getMetaData().getColumnCount(), resultSet));
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
index 67d542d5ca9..7b06e2bcf43 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/raw/type/RawMemoryQueryResult.java
@@ -29,6 +29,6 @@ import java.util.List;
 public final class RawMemoryQueryResult extends AbstractMemoryQueryResult {
     
     public RawMemoryQueryResult(final QueryResultMetaData metaData, final List<MemoryQueryResultDataRow> rows) {
-        super(metaData, rows.iterator());
+        super(metaData, rows);
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
index 10ed511ab93..500c1ba5846 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/type/memory/AbstractMemoryQueryResult.java
@@ -17,9 +17,7 @@
 
 package org.apache.shardingsphere.infra.executor.sql.execute.result.query.type.memory;
 
-import lombok.AccessLevel;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData;
@@ -31,12 +29,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.Iterator;
 
 /**
  * Abstract memory query result.
  */
-@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
 public abstract class AbstractMemoryQueryResult implements QueryResult {
     
     @Getter
@@ -46,10 +44,20 @@ public abstract class AbstractMemoryQueryResult implements QueryResult {
     
     private MemoryQueryResultDataRow currentRow;
     
+    @Getter
+    private long rowCount;
+    
+    protected AbstractMemoryQueryResult(final QueryResultMetaData metaData, final Collection<MemoryQueryResultDataRow> rows) {
+        this.metaData = metaData;
+        this.rows = rows.iterator();
+        rowCount = rows.size();
+    }
+    
     @Override
     public final boolean next() {
         if (rows.hasNext()) {
             currentRow = rows.next();
+            rowCount--;
             return true;
         }
         currentRow = null;
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
index fa89a901068..afad05bcf5c 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/memory/JDBCMemoryQueryResultTest.java
@@ -364,6 +364,16 @@ public final class JDBCMemoryQueryResultTest {
         assertTrue(queryResult.wasNull());
     }
     
+    @Test
+    public void assertGetRowCount() throws SQLException {
+        JDBCMemoryQueryResult queryResult = new JDBCMemoryQueryResult(mockResultSet(), databaseType);
+        assertThat(queryResult.getRowCount(), is(1L));
+        queryResult.next();
+        assertThat(queryResult.getRowCount(), is(0L));
+        queryResult.next();
+        assertThat(queryResult.getRowCount(), is(0L));
+    }
+    
     private ResultSet mockResultSet() throws SQLException {
         ResultSet result = mock(ResultSet.class);
         when(result.next()).thenReturn(true).thenReturn(false);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index 051dcfdb6f9..7fef747d3af 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -101,7 +101,8 @@ public final class SchemaAssignedDatabaseBackendHandler implements DatabaseBacke
             ((CursorDefinitionAware) statementContext).setUpCursorDefinition(cursorStatementContext);
         }
         if (statementContext instanceof CloseStatementContext) {
-            FetchOrderByValueQueuesHolder.get().remove(cursorName);
+            FetchOrderByValueQueuesHolder.getOrderByValueQueues().remove(cursorName);
+            FetchOrderByValueQueuesHolder.getRemainingRowCounts().remove(cursorName);
             connectionSession.getCursorDefinitions().remove(cursorName);
         }
     }