You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/11/01 07:51:38 UTC

[shardingsphere] branch master updated: Optimize AdvancedSQLFederationExecutor logic (#21879)

This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b832ef23c Optimize AdvancedSQLFederationExecutor logic (#21879)
f1b832ef23c is described below

commit f1b832ef23ce405011c3ba8ca1472f5991403cde
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Nov 1 15:51:32 2022 +0800

    Optimize AdvancedSQLFederationExecutor logic (#21879)
---
 .../sqlfederation/spi/SQLFederationExecutor.java   |  4 +--
 .../advanced/AdvancedSQLFederationExecutor.java    |  8 +++---
 .../executor/FilterableTableScanExecutor.java      |  1 -
 .../executor/TranslatableTableScanExecutor.java    | 26 +++++++++++++++++++
 .../original/OriginalSQLFederationExecutor.java    |  8 +++---
 .../translatable/TranslatableDatabase.java         |  4 +--
 .../metadata/translatable/TranslatableSchema.java  | 29 +++++++++++++++++-----
 .../optimizer/SQLOptimizeEngineTest.java           |  2 +-
 8 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java b/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
index 6e60fc4f8d2..f295a5ad260 100644
--- a/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
+++ b/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
@@ -42,11 +42,11 @@ public interface SQLFederationExecutor extends TypedSPI, RequiredSPI, AutoClosea
      * @param databaseName database name
      * @param schemaName schema name
      * @param metaData ShardingSphere meta data
-     * @param shardingSphereData ShardingSphere data
+     * @param data ShardingSphere data
      * @param jdbcExecutor jdbc executor
      * @param eventBusContext event bus context
      */
-    void init(String databaseName, String schemaName, ShardingSphereMetaData metaData, ShardingSphereData shardingSphereData, JDBCExecutor jdbcExecutor, EventBusContext eventBusContext);
+    void init(String databaseName, String schemaName, ShardingSphereMetaData metaData, ShardingSphereData data, JDBCExecutor jdbcExecutor, EventBusContext eventBusContext);
     
     /**
      * Execute query.
diff --git a/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java b/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
index 09e1e6f7964..89c7de23454 100644
--- a/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
+++ b/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
@@ -83,7 +83,7 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
     
     private ConfigurationProperties props;
     
-    private ShardingSphereData shardingSphereData;
+    private ShardingSphereData data;
     
     private JDBCExecutor jdbcExecutor;
     
@@ -92,14 +92,14 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
     private ResultSet resultSet;
     
     @Override
-    public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereData shardingSphereData,
+    public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereData data,
                      final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
         this.databaseName = databaseName;
         this.schemaName = schemaName;
         this.optimizerContext = OptimizerContextFactory.create(metaData.getDatabases(), metaData.getGlobalRuleMetaData());
         this.globalRuleMetaData = metaData.getGlobalRuleMetaData();
         this.props = metaData.getProps();
-        this.shardingSphereData = shardingSphereData;
+        this.data = data;
         this.jdbcExecutor = jdbcExecutor;
         this.eventBusContext = eventBusContext;
     }
@@ -129,7 +129,7 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
                                                      final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationExecutorContext federationContext) {
         TableScanExecutorContext executorContext = new TableScanExecutorContext(databaseName, schemaName, props, federationContext);
         // TODO replace FilterableTableScanExecutor with TranslatableTableScanExecutor
-        TableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, shardingSphereData, eventBusContext);
+        TableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, data, eventBusContext);
         // TODO replace FilterableSchema with TranslatableSchema
         return new FilterableSchema(schemaName, schema, JAVA_TYPE_FACTORY, executor);
     }
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 89ea25561ea..5bfc92793ef 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -174,7 +174,6 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
         Optional<ShardingSphereTableData> tableData = Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
                 .map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
         return tableData.map(this::createMemoryEnumerator).orElseGet(this::createEmptyEnumerable);
-        
     }
     
     private Enumerable<Object[]> createMemoryEnumerator(final ShardingSphereTableData tableData) {
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index 0d7faa218f6..c5851bb85fd 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -60,6 +60,9 @@ import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
@@ -77,6 +80,7 @@ import org.apache.shardingsphere.sqlfederation.optimizer.metadata.filter.Filtera
 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable.StringToRexNodeUtil;
 import org.apache.shardingsphere.sqlfederation.optimizer.util.SQLFederationPlannerUtil;
 import org.apache.shardingsphere.sqlfederation.row.EmptyRowEnumerator;
+import org.apache.shardingsphere.sqlfederation.row.MemoryEnumerator;
 import org.apache.shardingsphere.sqlfederation.row.SQLFederationRowEnumerator;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
 
@@ -90,6 +94,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
@@ -112,6 +117,8 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
     
     private final TableScanExecutorContext executorContext;
     
+    private final ShardingSphereData data;
+    
     private final EventBusContext eventBusContext;
     
     @Override
@@ -129,6 +136,9 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
             federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
             return createEmptyEnumerable();
         }
+        if (databaseType.getSystemSchemas().contains(schemaName)) {
+            return executeByShardingSphereData(databaseName, schemaName, table);
+        }
         return execute(databaseType, queryContext, database, context);
     }
     
@@ -162,6 +172,22 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
         return result;
     }
     
+    private Enumerable<Object[]> executeByShardingSphereData(final String databaseName, final String schemaName, final ShardingSphereTable table) {
+        Optional<ShardingSphereTableData> tableData = Optional.ofNullable(data.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
+                .map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
+        return tableData.map(this::createMemoryEnumerator).orElseGet(this::createEmptyEnumerable);
+    }
+    
+    private Enumerable<Object[]> createMemoryEnumerator(final ShardingSphereTableData tableData) {
+        return new AbstractEnumerable<Object[]>() {
+            
+            @Override
+            public Enumerator<Object[]> enumerator() {
+                return new MemoryEnumerator(tableData.getRows());
+            }
+        };
+    }
+    
     private Collection<Statement> getStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
         Collection<Statement> result = new LinkedList<>();
         for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
diff --git a/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java b/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
index 363a8693521..260f0c74d3f 100644
--- a/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
+++ b/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
@@ -70,7 +70,7 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
     
     private ConfigurationProperties props;
     
-    private ShardingSphereData shardingSphereData;
+    private ShardingSphereData data;
     
     private JDBCExecutor jdbcExecutor;
     
@@ -90,12 +90,12 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
     
     @Override
     public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData,
-                     final ShardingSphereData shardingSphereData, final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
+                     final ShardingSphereData data, final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
         this.databaseName = databaseName;
         this.schemaName = schemaName;
         this.optimizerContext = OptimizerContextFactory.create(metaData.getDatabases(), metaData.getGlobalRuleMetaData());
         this.globalRuleMetaData = metaData.getGlobalRuleMetaData();
-        this.shardingSphereData = shardingSphereData;
+        this.data = data;
         this.props = metaData.getProps();
         this.jdbcExecutor = jdbcExecutor;
         this.eventBusContext = eventBusContext;
@@ -122,7 +122,7 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
                            final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationExecutorContext federationContext) throws SQLException {
         TableScanExecutorContext executorContext = new TableScanExecutorContext(databaseName, schemaName, props, federationContext);
         FilterableTableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData,
-                executorContext, shardingSphereData, eventBusContext);
+                executorContext, data, eventBusContext);
         FilterableDatabase database = new FilterableDatabase(federationContext.getDatabases().get(databaseName.toLowerCase()), JAVA_TYPE_FACTORY, executor);
         // TODO support database.schema.table query when switch to AdvancedFederationExecutor, calcite jdbc just support schema.table query now
         connection.getRootSchema().add(schemaName, database.getSubSchema(schemaName));
diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
index 53b845ab21c..e602275f515 100644
--- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
+++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable;
 import lombok.Getter;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.impl.AbstractSchema;
-import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -46,7 +46,7 @@ public final class TranslatableDatabase extends AbstractSchema {
     private Map<String, Schema> createSubSchemaMap(final ShardingSphereDatabase database, final TableScanExecutor executor) {
         Map<String, Schema> result = new LinkedHashMap<>(database.getSchemas().size(), 1);
         for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
-            result.put(entry.getKey(), new TranslatableSchema(entry.getKey(), entry.getValue(), executor));
+            result.put(entry.getKey(), new TranslatableSchema(entry.getKey(), entry.getValue(), null, executor));
         }
         return result;
     }
diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
index a0f4cda48bc..77dbe31e9b5 100644
--- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
+++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
@@ -18,18 +18,25 @@
 package org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable;
 
 import lombok.Getter;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.ViewTable;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
 import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
 import org.apache.shardingsphere.sqlfederation.optimizer.metadata.statistic.FederationStatistic;
+import org.apache.shardingsphere.sqlfederation.optimizer.util.SQLFederationDataTypeUtil;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
- * Filterable schema.
+ * Translatable schema.
  */
 @Getter
 public final class TranslatableSchema extends AbstractSchema {
@@ -38,17 +45,27 @@ public final class TranslatableSchema extends AbstractSchema {
     
     private final Map<String, Table> tableMap;
     
-    public TranslatableSchema(final String schemaName, final ShardingSphereSchema schema, final TableScanExecutor executor) {
+    public TranslatableSchema(final String schemaName, final ShardingSphereSchema schema, final JavaTypeFactory javaTypeFactory, final TableScanExecutor executor) {
         name = schemaName;
-        tableMap = createTableMap(schema, executor);
+        tableMap = createTableMap(schema, javaTypeFactory, executor);
     }
     
-    private Map<String, Table> createTableMap(final ShardingSphereSchema schema, final TableScanExecutor executor) {
+    private Map<String, Table> createTableMap(final ShardingSphereSchema schema, final JavaTypeFactory javaTypeFactory, final TableScanExecutor executor) {
         Map<String, Table> result = new LinkedHashMap<>(schema.getTables().size(), 1);
         for (ShardingSphereTable each : schema.getTables().values()) {
-            // TODO implement table statistic logic after using custom operators
-            result.put(each.getName(), new FederationTranslatableTable(each, executor, new FederationStatistic()));
+            if (schema.containsView(each.getName())) {
+                result.put(each.getName(), getViewTable(schema, each, javaTypeFactory));
+            } else {
+                // TODO implement table statistic logic after using custom operators
+                result.put(each.getName(), new FederationTranslatableTable(each, executor, new FederationStatistic()));
+            }
         }
         return result;
     }
+    
+    private static ViewTable getViewTable(final ShardingSphereSchema schema, final ShardingSphereTable table, final JavaTypeFactory javaTypeFactory) {
+        RelDataType relDataType = SQLFederationDataTypeUtil.createRelDataType(table, javaTypeFactory);
+        ShardingSphereView view = schema.getView(table.getName());
+        return new ViewTable(javaTypeFactory.getJavaClass(relDataType), RelDataTypeImpl.proto(relDataType), view.getViewDefinition(), Collections.emptyList(), Collections.emptyList());
+    }
 }
diff --git a/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java b/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
index 56a73299af1..da81b1c82a7 100644
--- a/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
+++ b/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
@@ -126,7 +126,7 @@ public final class SQLOptimizeEngineTest {
     private SqlToRelConverter createSqlToRelConverter(final ShardingSphereSchema schema) {
         CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(new Properties());
         RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
-        TranslatableSchema federationSchema = new TranslatableSchema(SCHEMA_NAME, schema, null);
+        TranslatableSchema federationSchema = new TranslatableSchema(SCHEMA_NAME, schema, new JavaTypeFactoryImpl(), null);
         CalciteCatalogReader catalogReader = SQLFederationPlannerUtil.createCatalogReader(SCHEMA_NAME, federationSchema, relDataTypeFactory, connectionConfig);
         H2DatabaseType databaseType = new H2DatabaseType();
         SqlValidator validator = SQLFederationPlannerUtil.createSqlValidator(catalogReader, relDataTypeFactory, databaseType, connectionConfig);