You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/03/01 03:59:22 UTC

[shardingsphere] branch master updated: Support sharding join SQLs using calcite executor (#9534)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f5cf296  Support sharding join SQLs using calcite executor (#9534)
f5cf296 is described below

commit f5cf2963a4a76d3f860bbd42cee96a7a59b5502e
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Mon Mar 1 11:58:48 2021 +0800

    Support sharding join SQLs using calcite executor (#9534)
---
 .../engine/type/ShardingRouteEngineFactory.java    | 30 +++++++++++++++-------
 .../ShardingFederatedRoutingEngine.java}           | 27 +++++++++----------
 .../statement/dml/SelectStatementContext.java      | 10 ++++++++
 .../infra/route/context/RouteContext.java          | 17 ++++++++++++
 4 files changed, 60 insertions(+), 24 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
index a7b1137..e9119cd 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/ShardingRouteEngineFactory.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.sharding.route.engine.type;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -30,7 +31,7 @@ import org.apache.shardingsphere.sharding.route.engine.type.broadcast.ShardingIn
 import org.apache.shardingsphere.sharding.route.engine.type.broadcast.ShardingTableBroadcastRoutingEngine;
 import org.apache.shardingsphere.sharding.route.engine.type.complex.ShardingComplexRoutingEngine;
 import org.apache.shardingsphere.sharding.route.engine.type.ignore.ShardingIgnoreRoutingEngine;
-import org.apache.shardingsphere.sharding.route.engine.type.single.ShardingSingleRoutingEngine;
+import org.apache.shardingsphere.sharding.route.engine.type.federated.ShardingFederatedRoutingEngine;
 import org.apache.shardingsphere.sharding.route.engine.type.single.SingleTableRoutingEngine;
 import org.apache.shardingsphere.sharding.route.engine.type.standard.ShardingStandardRoutingEngine;
 import org.apache.shardingsphere.sharding.route.engine.type.unicast.ShardingUnicastRoutingEngine;
@@ -63,7 +64,7 @@ public final class ShardingRouteEngineFactory {
     
     /**
      * Create new instance of routing engine.
-     * 
+     *
      * @param shardingRule sharding rule
      * @param metaData ShardingSphere metaData
      * @param sqlStatementContext SQL statement context
@@ -71,7 +72,7 @@ public final class ShardingRouteEngineFactory {
      * @param props ShardingSphere properties
      * @return new instance of routing engine
      */
-    public static ShardingRouteEngine newInstance(final ShardingRule shardingRule, final ShardingSphereMetaData metaData, 
+    public static ShardingRouteEngine newInstance(final ShardingRule shardingRule, final ShardingSphereMetaData metaData,
                                                   final SQLStatementContext<?> sqlStatementContext, final ShardingConditions shardingConditions, final ConfigurationProperties props) {
         SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
         Collection<String> tableNames = sqlStatementContext.getTablesContext().getTableNames();
@@ -82,7 +83,7 @@ public final class ShardingRouteEngineFactory {
             return getDDLRoutingEngine(shardingRule, metaData, sqlStatementContext);
         }
         if (sqlStatement instanceof DALStatement) {
-            return getDALRoutingEngine(shardingRule, metaData, sqlStatement, tableNames);
+            return getDALRoutingEngine(shardingRule, sqlStatement, tableNames);
         }
         if (sqlStatement instanceof DCLStatement) {
             return getDCLRoutingEngine(shardingRule, metaData, sqlStatementContext);
@@ -104,8 +105,7 @@ public final class ShardingRouteEngineFactory {
         return new ShardingTableBroadcastRoutingEngine(metaData.getSchema(), sqlStatementContext);
     }
     
-    private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardingRule, final ShardingSphereMetaData metaData, 
-                                                           final SQLStatement sqlStatement, final Collection<String> tableNames) {
+    private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardingRule, final SQLStatement sqlStatement, final Collection<String> tableNames) {
         if (sqlStatement instanceof MySQLUseStatement) {
             return new ShardingIgnoreRoutingEngine();
         }
@@ -153,18 +153,30 @@ public final class ShardingRouteEngineFactory {
             return new SingleTableRoutingEngine(tableNames, sqlStatement);
         }
         if (shardingRule.tableRuleExists(tableNames) && shardingRule.singleTableRuleExists(tableNames)) {
-            return new ShardingSingleRoutingEngine(tableNames);
+            return new ShardingFederatedRoutingEngine(tableNames);
         }
-        return getShardingRoutingEngine(shardingRule, shardingConditions, tableNames, props);
+        return getShardingRoutingEngine(shardingRule, shardingConditions, sqlStatementContext, tableNames, props);
     }
     
-    private static ShardingRouteEngine getShardingRoutingEngine(final ShardingRule shardingRule, final ShardingConditions shardingConditions, 
+    private static ShardingRouteEngine getShardingRoutingEngine(final ShardingRule shardingRule, final ShardingConditions shardingConditions,
+                                                                final SQLStatementContext<?> sqlStatementContext,
                                                                 final Collection<String> tableNames, final ConfigurationProperties props) {
         Collection<String> shardingTableNames = shardingRule.getShardingLogicTableNames(tableNames);
         if (1 == shardingTableNames.size() || shardingRule.isAllBindingTables(shardingTableNames)) {
             return new ShardingStandardRoutingEngine(shardingTableNames.iterator().next(), shardingConditions, props);
         }
+        if (isFederatedQuery(sqlStatementContext, tableNames, shardingTableNames)) {
+            return new ShardingFederatedRoutingEngine(tableNames);
+        }
         // TODO config for cartesian set
         return new ShardingComplexRoutingEngine(tableNames, shardingConditions, props);
     }
+    
+    private static boolean isFederatedQuery(final SQLStatementContext<?> sqlStatementContext, final Collection<String> tableNames, final Collection<String> shardingTableNames) {
+        if (!(sqlStatementContext instanceof SelectStatementContext)) {
+            return false;
+        }
+        SelectStatementContext select = (SelectStatementContext) sqlStatementContext;
+        return tableNames.size() == shardingTableNames.size() && (select.isContainsJoinQuery() || select.isContainsSubquery());
+    }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
similarity index 61%
rename from shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java
rename to shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
index 7ec9aee..47da4e5 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/single/ShardingSingleRoutingEngine.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-route/src/main/java/org/apache/shardingsphere/sharding/route/engine/type/federated/ShardingFederatedRoutingEngine.java
@@ -15,26 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sharding.route.engine.type.single;
+package org.apache.shardingsphere.sharding.route.engine.type.federated;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.route.context.RouteContext;
 import org.apache.shardingsphere.infra.route.context.RouteMapper;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngine;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.TableRule;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
 
 /**
- * Sharding single routing engine.
+ * Sharding federated routing engine.
  */
 @RequiredArgsConstructor
-public final class ShardingSingleRoutingEngine implements ShardingRouteEngine {
+public final class ShardingFederatedRoutingEngine implements ShardingRouteEngine {
     
     private final Collection<String> logicTables;
     
@@ -42,23 +39,23 @@ public final class ShardingSingleRoutingEngine implements ShardingRouteEngine {
     public void route(final RouteContext routeContext, final ShardingRule shardingRule) {
         for (String each : logicTables) {
             if (shardingRule.getSingleTableRules().containsKey(each)) {
-                String datasource = shardingRule.getSingleTableRules().get(each).getDataSourceName();
-                RouteUnit unit = new RouteUnit(new RouteMapper(datasource, datasource), Collections.singletonList(new RouteMapper(each, each)));
-                routeContext.getRouteUnits().add(unit);
+                String dataSourceName = shardingRule.getSingleTableRules().get(each).getDataSourceName();
+                RouteMapper dataSource = new RouteMapper(dataSourceName, dataSourceName);
+                RouteMapper table = new RouteMapper(each, each);
+                routeContext.putRouteUnit(dataSource, table);
             } else {
-                routeContext.getRouteUnits().addAll(getAllRouteUnits(shardingRule, each));
+                fillRouteContext(routeContext, shardingRule, each);
             }
         }
         routeContext.setToCalcite(true);
     }
     
-    private Collection<RouteUnit> getAllRouteUnits(final ShardingRule shardingRule, final String logicTableName) {
-        Collection<RouteUnit> result = new LinkedList<>();
+    private void fillRouteContext(final RouteContext routeContext, final ShardingRule shardingRule, final String logicTableName) {
         TableRule tableRule = shardingRule.getTableRule(logicTableName);
         for (DataNode each : tableRule.getActualDataNodes()) {
-            RouteUnit routeUnit = new RouteUnit(new RouteMapper(each.getDataSourceName(), each.getDataSourceName()), Collections.singletonList(new RouteMapper(logicTableName, each.getTableName())));
-            result.add(routeUnit);
+            RouteMapper dataSource = new RouteMapper(each.getDataSourceName(), each.getDataSourceName());
+            RouteMapper table = new RouteMapper(logicTableName, each.getTableName());
+            routeContext.putRouteUnit(dataSource, table);
         }
-        return result;
     }
 }
diff --git a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
index 2b8ea7d..00aa1dd 100644
--- a/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-binder/src/main/java/org/apache/shardingsphere/infra/binder/statement/dml/SelectStatementContext.java
@@ -41,6 +41,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.Ex
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.IndexOrderByItemSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.order.item.TextOrderByItemSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.predicate.WhereSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.JoinTableSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
@@ -102,6 +103,15 @@ public final class SelectStatementContext extends CommonSQLStatementContext<Sele
     }
     
     /**
+     * Whether it contain join query.
+     *
+     * @return contain join query or not
+     */
+    public boolean isContainsJoinQuery() {
+        return getSqlStatement().getFrom() instanceof JoinTableSegment;
+    }
+    
+    /**
      * Set indexes.
      *
      * @param columnLabelIndexMap map for column label and index
diff --git a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
index 7b05ad7..0ed5ff7 100644
--- a/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
+++ b/shardingsphere-infra/shardingsphere-infra-route/src/main/java/org/apache/shardingsphere/infra/route/context/RouteContext.java
@@ -135,4 +135,21 @@ public final class RouteContext {
         }
         return Optional.empty();
     }
+    
+    /**
+     * Put route unit.
+     *
+     * @param dataSourceMapper database mapper
+     * @param tableMapper table mapper
+     */
+    public void putRouteUnit(final RouteMapper dataSourceMapper, final RouteMapper tableMapper) {
+        Optional<RouteUnit> target = routeUnits.stream().filter(unit -> unit.getDataSourceMapper().equals(dataSourceMapper)).findFirst();
+        RouteUnit unit = new RouteUnit(dataSourceMapper, new LinkedHashSet<>());
+        if (target.isPresent()) {
+            unit.getTableMappers().addAll(target.get().getTableMappers());
+            routeUnits.remove(target.get());
+        }
+        unit.getTableMappers().add(tableMapper);
+        routeUnits.add(unit);
+    }
 }