You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/06 12:39:41 UTC

[shardingsphere] branch master updated: Optimize fetech statement merge logic (#18200)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a63800ddb45 Optimize fetech statement merge logic (#18200)
a63800ddb45 is described below

commit a63800ddb4513f7f9b79c00830507b58fe2101bc
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Mon Jun 6 20:39:35 2022 +0800

    Optimize fetech statement merge logic (#18200)
---
 .../merge/ddl/ShardingDDLResultMerger.java         |  6 +--
 .../merge/ddl/fetch/FetchStreamMergedResult.java   | 29 +++++++++-----
 .../parser/sql/common/constant/DirectionType.java  | 12 ++++--
 .../sql/common/constant/DirectionTypeTest.java     | 44 ++++++++++++++++++++++
 4 files changed, 75 insertions(+), 16 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
index a929d8f6087..0ec6784dcf1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
@@ -50,10 +50,10 @@ public final class ShardingDDLResultMerger implements ResultMerger {
         if (1 == queryResults.size()) {
             return new IteratorStreamMergedResult(queryResults);
         }
+        FetchStatementContext fetchStatementContext = (FetchStatementContext) sqlStatementContext;
         Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
-        FetchStatementContext statementContext = (FetchStatementContext) sqlStatementContext;
-        statementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
-        return new FetchStreamMergedResult(queryResults, statementContext, getSchema(sqlStatementContext, database));
+        fetchStatementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
+        return new FetchStreamMergedResult(queryResults, fetchStatementContext, getSchema(sqlStatementContext, database));
     }
     
     private ShardingSphereSchema getSchema(final SQLStatementContext<?> sqlStatementContext, final ShardingSphereDatabase database) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 7c6b7e99298..a5fb82256a1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -17,16 +17,16 @@
 
 package org.apache.shardingsphere.sharding.merge.ddl.fetch;
 
-import org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
 import org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
@@ -36,28 +36,33 @@ import java.util.Queue;
  */
 public final class FetchStreamMergedResult extends StreamMergedResult {
     
-    private final Collection<OrderByItem> orderByItems;
-    
     private final Queue<OrderByValue> orderByValuesQueue;
     
+    private final DirectionType directionType;
+    
+    private long fetchCount;
+    
     private boolean isFirstNext;
     
     public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) throws SQLException {
         String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
         SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
-        orderByItems = selectStatementContext.getOrderByContext().getItems();
         orderByValuesQueue = FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
         orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+        directionType = fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
+        fetchCount = fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
         isFirstNext = true;
     }
     
     private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
+        
         for (QueryResult each : queryResults) {
-            OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schema);
+            OrderByValue orderByValue = new OrderByValue(each, selectStatementContext.getOrderByContext().getItems(), selectStatementContext, schema);
             if (orderByValue.next()) {
                 orderByValuesQueue.offer(orderByValue);
             }
         }
+        setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
     }
     
     @Override
@@ -65,13 +70,19 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
         if (orderByValuesQueue.isEmpty()) {
             return false;
         }
-        // TODO support fetch count and fetch all statement
         if (isFirstNext) {
-            setCurrentQueryResult(orderByValuesQueue.poll().getQueryResult());
             isFirstNext = false;
+            fetchCount--;
             return true;
-        } else {
+        }
+        OrderByValue firstOrderByValue = orderByValuesQueue.poll();
+        if (firstOrderByValue.next()) {
+            orderByValuesQueue.offer(firstOrderByValue);
+        }
+        if (orderByValuesQueue.isEmpty()) {
             return false;
         }
+        setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
+        return DirectionType.isAllDirectionType(directionType) || fetchCount-- > 0;
     }
 }
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
index 20b1374b5ff..c97d418149a 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.sql.parser.sql.common.constant;
 
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Direction type enum.
@@ -26,13 +28,15 @@ public enum DirectionType {
     
     NEXT, PRIOR, FIRST, LAST, ABSOLUTE_COUNT, RELATIVE_COUNT, COUNT, ALL, FORWARD, FORWARD_COUNT, FORWARD_ALL, BACKWARD, BACKWARD_COUNT, BACKWARD_ALL;
     
+    private static final Set<DirectionType> ALL_DIRECTION_TYPES = new HashSet<>(Arrays.asList(ALL, FORWARD_ALL, BACKWARD_ALL));
+    
     /**
-     * Is direction type.
+     * Is all direction type.
      * 
      * @param directionType direction type
-     * @return is direction type or not
+     * @return is all direction type or not
      */
-    public static boolean isAggregationType(final String directionType) {
-        return Arrays.stream(values()).anyMatch(each -> directionType.equalsIgnoreCase(each.name()));
+    public static boolean isAllDirectionType(final DirectionType directionType) {
+        return ALL_DIRECTION_TYPES.contains(directionType);
     }
 }
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
new file mode 100644
index 00000000000..8799347256c
--- /dev/null
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.sql.common.constant;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class DirectionTypeTest {
+    
+    @Test
+    public void assertIsAggregationType() {
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.NEXT));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.PRIOR));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.FIRST));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.LAST));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.ABSOLUTE_COUNT));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.RELATIVE_COUNT));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.COUNT));
+        assertTrue(DirectionType.isAllDirectionType(DirectionType.ALL));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD_COUNT));
+        assertTrue(DirectionType.isAllDirectionType(DirectionType.FORWARD_ALL));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD));
+        assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD_COUNT));
+        assertTrue(DirectionType.isAllDirectionType(DirectionType.BACKWARD_ALL));
+    }
+}