You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/06 12:39:41 UTC
[shardingsphere] branch master updated: Optimize fetech statement merge logic (#18200)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a63800ddb45 Optimize fetech statement merge logic (#18200)
a63800ddb45 is described below
commit a63800ddb4513f7f9b79c00830507b58fe2101bc
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Mon Jun 6 20:39:35 2022 +0800
Optimize fetech statement merge logic (#18200)
---
.../merge/ddl/ShardingDDLResultMerger.java | 6 +--
.../merge/ddl/fetch/FetchStreamMergedResult.java | 29 +++++++++-----
.../parser/sql/common/constant/DirectionType.java | 12 ++++--
.../sql/common/constant/DirectionTypeTest.java | 44 ++++++++++++++++++++++
4 files changed, 75 insertions(+), 16 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
index a929d8f6087..0ec6784dcf1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/ShardingDDLResultMerger.java
@@ -50,10 +50,10 @@ public final class ShardingDDLResultMerger implements ResultMerger {
if (1 == queryResults.size()) {
return new IteratorStreamMergedResult(queryResults);
}
+ FetchStatementContext fetchStatementContext = (FetchStatementContext) sqlStatementContext;
Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
- FetchStatementContext statementContext = (FetchStatementContext) sqlStatementContext;
- statementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
- return new FetchStreamMergedResult(queryResults, statementContext, getSchema(sqlStatementContext, database));
+ fetchStatementContext.getCursorStatementContext().getSelectStatementContext().setIndexes(columnLabelIndexMap);
+ return new FetchStreamMergedResult(queryResults, fetchStatementContext, getSchema(sqlStatementContext, database));
}
private ShardingSphereSchema getSchema(final SQLStatementContext<?> sqlStatementContext, final ShardingSphereDatabase database) {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
index 7c6b7e99298..a5fb82256a1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/merge/ddl/fetch/FetchStreamMergedResult.java
@@ -17,16 +17,16 @@
package org.apache.shardingsphere.sharding.merge.ddl.fetch;
-import org.apache.shardingsphere.infra.binder.segment.select.orderby.OrderByItem;
import org.apache.shardingsphere.infra.binder.statement.ddl.FetchStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.merge.result.impl.stream.StreamMergedResult;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.merge.dql.orderby.OrderByValue;
+import org.apache.shardingsphere.sql.parser.sql.common.constant.DirectionType;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.cursor.DirectionSegment;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
@@ -36,28 +36,33 @@ import java.util.Queue;
*/
public final class FetchStreamMergedResult extends StreamMergedResult {
- private final Collection<OrderByItem> orderByItems;
-
private final Queue<OrderByValue> orderByValuesQueue;
+ private final DirectionType directionType;
+
+ private long fetchCount;
+
private boolean isFirstNext;
public FetchStreamMergedResult(final List<QueryResult> queryResults, final FetchStatementContext fetchStatementContext, final ShardingSphereSchema schema) throws SQLException {
String cursorName = fetchStatementContext.getCursorName().getIdentifier().getValue().toLowerCase();
SelectStatementContext selectStatementContext = fetchStatementContext.getCursorStatementContext().getSelectStatementContext();
- orderByItems = selectStatementContext.getOrderByContext().getItems();
orderByValuesQueue = FetchOrderByValueQueuesHolder.get().computeIfAbsent(cursorName, key -> new PriorityQueue<>(queryResults.size()));
orderResultSetsToQueue(queryResults, selectStatementContext, schema);
+ directionType = fetchStatementContext.getSqlStatement().getDirection().map(DirectionSegment::getDirectionType).orElse(DirectionType.NEXT);
+ fetchCount = fetchStatementContext.getSqlStatement().getDirection().flatMap(DirectionSegment::getCount).orElse(1L);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final ShardingSphereSchema schema) throws SQLException {
+
for (QueryResult each : queryResults) {
- OrderByValue orderByValue = new OrderByValue(each, orderByItems, selectStatementContext, schema);
+ OrderByValue orderByValue = new OrderByValue(each, selectStatementContext.getOrderByContext().getItems(), selectStatementContext, schema);
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
+ setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
@Override
@@ -65,13 +70,19 @@ public final class FetchStreamMergedResult extends StreamMergedResult {
if (orderByValuesQueue.isEmpty()) {
return false;
}
- // TODO support fetch count and fetch all statement
if (isFirstNext) {
- setCurrentQueryResult(orderByValuesQueue.poll().getQueryResult());
isFirstNext = false;
+ fetchCount--;
return true;
- } else {
+ }
+ OrderByValue firstOrderByValue = orderByValuesQueue.poll();
+ if (firstOrderByValue.next()) {
+ orderByValuesQueue.offer(firstOrderByValue);
+ }
+ if (orderByValuesQueue.isEmpty()) {
return false;
}
+ setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
+ return DirectionType.isAllDirectionType(directionType) || fetchCount-- > 0;
}
}
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
index 20b1374b5ff..c97d418149a 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/main/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionType.java
@@ -18,6 +18,8 @@
package org.apache.shardingsphere.sql.parser.sql.common.constant;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
/**
* Direction type enum.
@@ -26,13 +28,15 @@ public enum DirectionType {
NEXT, PRIOR, FIRST, LAST, ABSOLUTE_COUNT, RELATIVE_COUNT, COUNT, ALL, FORWARD, FORWARD_COUNT, FORWARD_ALL, BACKWARD, BACKWARD_COUNT, BACKWARD_ALL;
+ private static final Set<DirectionType> ALL_DIRECTION_TYPES = new HashSet<>(Arrays.asList(ALL, FORWARD_ALL, BACKWARD_ALL));
+
/**
- * Is direction type.
+ * Is all direction type.
*
* @param directionType direction type
- * @return is direction type or not
+ * @return is all direction type or not
*/
- public static boolean isAggregationType(final String directionType) {
- return Arrays.stream(values()).anyMatch(each -> directionType.equalsIgnoreCase(each.name()));
+ public static boolean isAllDirectionType(final DirectionType directionType) {
+ return ALL_DIRECTION_TYPES.contains(directionType);
}
}
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
new file mode 100644
index 00000000000..8799347256c
--- /dev/null
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-statement/src/test/java/org/apache/shardingsphere/sql/parser/sql/common/constant/DirectionTypeTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sql.parser.sql.common.constant;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public final class DirectionTypeTest {
+
+ @Test
+ public void assertIsAggregationType() {
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.NEXT));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.PRIOR));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.FIRST));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.LAST));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.ABSOLUTE_COUNT));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.RELATIVE_COUNT));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.COUNT));
+ assertTrue(DirectionType.isAllDirectionType(DirectionType.ALL));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.FORWARD_COUNT));
+ assertTrue(DirectionType.isAllDirectionType(DirectionType.FORWARD_ALL));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD));
+ assertFalse(DirectionType.isAllDirectionType(DirectionType.BACKWARD_COUNT));
+ assertTrue(DirectionType.isAllDirectionType(DirectionType.BACKWARD_ALL));
+ }
+}