You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by tu...@apache.org on 2022/11/01 07:51:38 UTC
[shardingsphere] branch master updated: Optimize AdvancedSQLFederationExecutor logic (#21879)
This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f1b832ef23c Optimize AdvancedSQLFederationExecutor logic (#21879)
f1b832ef23c is described below
commit f1b832ef23ce405011c3ba8ca1472f5991403cde
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Nov 1 15:51:32 2022 +0800
Optimize AdvancedSQLFederationExecutor logic (#21879)
---
.../sqlfederation/spi/SQLFederationExecutor.java | 4 +--
.../advanced/AdvancedSQLFederationExecutor.java | 8 +++---
.../executor/FilterableTableScanExecutor.java | 1 -
.../executor/TranslatableTableScanExecutor.java | 26 +++++++++++++++++++
.../original/OriginalSQLFederationExecutor.java | 8 +++---
.../translatable/TranslatableDatabase.java | 4 +--
.../metadata/translatable/TranslatableSchema.java | 29 +++++++++++++++++-----
.../optimizer/SQLOptimizeEngineTest.java | 2 +-
8 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java b/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
index 6e60fc4f8d2..f295a5ad260 100644
--- a/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
+++ b/kernel/sql-federation/api/src/main/java/org/apache/shardingsphere/sqlfederation/spi/SQLFederationExecutor.java
@@ -42,11 +42,11 @@ public interface SQLFederationExecutor extends TypedSPI, RequiredSPI, AutoClosea
* @param databaseName database name
* @param schemaName schema name
* @param metaData ShardingSphere meta data
- * @param shardingSphereData ShardingSphere data
+ * @param data ShardingSphere data
* @param jdbcExecutor jdbc executor
* @param eventBusContext event bus context
*/
- void init(String databaseName, String schemaName, ShardingSphereMetaData metaData, ShardingSphereData shardingSphereData, JDBCExecutor jdbcExecutor, EventBusContext eventBusContext);
+ void init(String databaseName, String schemaName, ShardingSphereMetaData metaData, ShardingSphereData data, JDBCExecutor jdbcExecutor, EventBusContext eventBusContext);
/**
* Execute query.
diff --git a/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java b/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
index 09e1e6f7964..89c7de23454 100644
--- a/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
+++ b/kernel/sql-federation/executor/advanced/src/main/java/org/apache/shardingsphere/sqlfederation/advanced/AdvancedSQLFederationExecutor.java
@@ -83,7 +83,7 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
private ConfigurationProperties props;
- private ShardingSphereData shardingSphereData;
+ private ShardingSphereData data;
private JDBCExecutor jdbcExecutor;
@@ -92,14 +92,14 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
private ResultSet resultSet;
@Override
- public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereData shardingSphereData,
+ public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData, final ShardingSphereData data,
final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.optimizerContext = OptimizerContextFactory.create(metaData.getDatabases(), metaData.getGlobalRuleMetaData());
this.globalRuleMetaData = metaData.getGlobalRuleMetaData();
this.props = metaData.getProps();
- this.shardingSphereData = shardingSphereData;
+ this.data = data;
this.jdbcExecutor = jdbcExecutor;
this.eventBusContext = eventBusContext;
}
@@ -129,7 +129,7 @@ public final class AdvancedSQLFederationExecutor implements SQLFederationExecuto
final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationExecutorContext federationContext) {
TableScanExecutorContext executorContext = new TableScanExecutorContext(databaseName, schemaName, props, federationContext);
// TODO replace FilterableTableScanExecutor with TranslatableTableScanExecutor
- TableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, shardingSphereData, eventBusContext);
+ TableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, data, eventBusContext);
// TODO replace FilterableSchema with TranslatableSchema
return new FilterableSchema(schemaName, schema, JAVA_TYPE_FACTORY, executor);
}
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
index 89ea25561ea..5bfc92793ef 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/FilterableTableScanExecutor.java
@@ -174,7 +174,6 @@ public final class FilterableTableScanExecutor implements TableScanExecutor {
Optional<ShardingSphereTableData> tableData = Optional.ofNullable(shardingSphereData.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
.map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
return tableData.map(this::createMemoryEnumerator).orElseGet(this::createEmptyEnumerable);
-
}
private Enumerable<Object[]> createMemoryEnumerator(final ShardingSphereTableData tableData) {
diff --git a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
index 0d7faa218f6..c5851bb85fd 100644
--- a/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
+++ b/kernel/sql-federation/executor/core/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TranslatableTableScanExecutor.java
@@ -60,6 +60,9 @@ import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereSchemaData;
+import org.apache.shardingsphere.infra.metadata.data.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
@@ -77,6 +80,7 @@ import org.apache.shardingsphere.sqlfederation.optimizer.metadata.filter.Filtera
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable.StringToRexNodeUtil;
import org.apache.shardingsphere.sqlfederation.optimizer.util.SQLFederationPlannerUtil;
import org.apache.shardingsphere.sqlfederation.row.EmptyRowEnumerator;
+import org.apache.shardingsphere.sqlfederation.row.MemoryEnumerator;
import org.apache.shardingsphere.sqlfederation.row.SQLFederationRowEnumerator;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
@@ -90,6 +94,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
@@ -112,6 +117,8 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
private final TableScanExecutorContext executorContext;
+ private final ShardingSphereData data;
+
private final EventBusContext eventBusContext;
@Override
@@ -129,6 +136,9 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
federationContext.getExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyEnumerable();
}
+ if (databaseType.getSystemSchemas().contains(schemaName)) {
+ return executeByShardingSphereData(databaseName, schemaName, table);
+ }
return execute(databaseType, queryContext, database, context);
}
@@ -162,6 +172,22 @@ public final class TranslatableTableScanExecutor implements TableScanExecutor {
return result;
}
+ private Enumerable<Object[]> executeByShardingSphereData(final String databaseName, final String schemaName, final ShardingSphereTable table) {
+ Optional<ShardingSphereTableData> tableData = Optional.ofNullable(data.getDatabaseData().get(databaseName)).map(optional -> optional.getSchemaData().get(schemaName))
+ .map(ShardingSphereSchemaData::getTableData).map(shardingSphereData -> shardingSphereData.get(table.getName()));
+ return tableData.map(this::createMemoryEnumerator).orElseGet(this::createEmptyEnumerable);
+ }
+
+ private Enumerable<Object[]> createMemoryEnumerator(final ShardingSphereTableData tableData) {
+ return new AbstractEnumerable<Object[]>() {
+
+ @Override
+ public Enumerator<Object[]> enumerator() {
+ return new MemoryEnumerator(tableData.getRows());
+ }
+ };
+ }
+
private Collection<Statement> getStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) {
Collection<Statement> result = new LinkedList<>();
for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) {
diff --git a/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java b/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
index 363a8693521..260f0c74d3f 100644
--- a/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
+++ b/kernel/sql-federation/executor/original/src/main/java/org/apache/shardingsphere/sqlfederation/original/OriginalSQLFederationExecutor.java
@@ -70,7 +70,7 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
private ConfigurationProperties props;
- private ShardingSphereData shardingSphereData;
+ private ShardingSphereData data;
private JDBCExecutor jdbcExecutor;
@@ -90,12 +90,12 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
@Override
public void init(final String databaseName, final String schemaName, final ShardingSphereMetaData metaData,
- final ShardingSphereData shardingSphereData, final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
+ final ShardingSphereData data, final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.optimizerContext = OptimizerContextFactory.create(metaData.getDatabases(), metaData.getGlobalRuleMetaData());
this.globalRuleMetaData = metaData.getGlobalRuleMetaData();
- this.shardingSphereData = shardingSphereData;
+ this.data = data;
this.props = metaData.getProps();
this.jdbcExecutor = jdbcExecutor;
this.eventBusContext = eventBusContext;
@@ -122,7 +122,7 @@ public final class OriginalSQLFederationExecutor implements SQLFederationExecuto
final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationExecutorContext federationContext) throws SQLException {
TableScanExecutorContext executorContext = new TableScanExecutorContext(databaseName, schemaName, props, federationContext);
FilterableTableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData,
- executorContext, shardingSphereData, eventBusContext);
+ executorContext, data, eventBusContext);
FilterableDatabase database = new FilterableDatabase(federationContext.getDatabases().get(databaseName.toLowerCase()), JAVA_TYPE_FACTORY, executor);
// TODO support database.schema.table query when switch to AdvancedFederationExecutor, calcite jdbc just support schema.table query now
connection.getRootSchema().add(schemaName, database.getSubSchema(schemaName));
diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
index 53b845ab21c..e602275f515 100644
--- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
+++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableDatabase.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable;
import lombok.Getter;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.impl.AbstractSchema;
-import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -46,7 +46,7 @@ public final class TranslatableDatabase extends AbstractSchema {
private Map<String, Schema> createSubSchemaMap(final ShardingSphereDatabase database, final TableScanExecutor executor) {
Map<String, Schema> result = new LinkedHashMap<>(database.getSchemas().size(), 1);
for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
- result.put(entry.getKey(), new TranslatableSchema(entry.getKey(), entry.getValue(), executor));
+ result.put(entry.getKey(), new TranslatableSchema(entry.getKey(), entry.getValue(), null, executor));
}
return result;
}
diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
index a0f4cda48bc..77dbe31e9b5 100644
--- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
+++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/translatable/TranslatableSchema.java
@@ -18,18 +18,25 @@
package org.apache.shardingsphere.sqlfederation.optimizer.metadata.translatable;
import lombok.Getter;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.ViewTable;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
import org.apache.shardingsphere.sqlfederation.optimizer.executor.TableScanExecutor;
import org.apache.shardingsphere.sqlfederation.optimizer.metadata.statistic.FederationStatistic;
+import org.apache.shardingsphere.sqlfederation.optimizer.util.SQLFederationDataTypeUtil;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
/**
- * Filterable schema.
+ * Translatable schema.
*/
@Getter
public final class TranslatableSchema extends AbstractSchema {
@@ -38,17 +45,27 @@ public final class TranslatableSchema extends AbstractSchema {
private final Map<String, Table> tableMap;
- public TranslatableSchema(final String schemaName, final ShardingSphereSchema schema, final TableScanExecutor executor) {
+ public TranslatableSchema(final String schemaName, final ShardingSphereSchema schema, final JavaTypeFactory javaTypeFactory, final TableScanExecutor executor) {
name = schemaName;
- tableMap = createTableMap(schema, executor);
+ tableMap = createTableMap(schema, javaTypeFactory, executor);
}
- private Map<String, Table> createTableMap(final ShardingSphereSchema schema, final TableScanExecutor executor) {
+ private Map<String, Table> createTableMap(final ShardingSphereSchema schema, final JavaTypeFactory javaTypeFactory, final TableScanExecutor executor) {
Map<String, Table> result = new LinkedHashMap<>(schema.getTables().size(), 1);
for (ShardingSphereTable each : schema.getTables().values()) {
- // TODO implement table statistic logic after using custom operators
- result.put(each.getName(), new FederationTranslatableTable(each, executor, new FederationStatistic()));
+ if (schema.containsView(each.getName())) {
+ result.put(each.getName(), getViewTable(schema, each, javaTypeFactory));
+ } else {
+ // TODO implement table statistic logic after using custom operators
+ result.put(each.getName(), new FederationTranslatableTable(each, executor, new FederationStatistic()));
+ }
}
return result;
}
+
+ private static ViewTable getViewTable(final ShardingSphereSchema schema, final ShardingSphereTable table, final JavaTypeFactory javaTypeFactory) {
+ RelDataType relDataType = SQLFederationDataTypeUtil.createRelDataType(table, javaTypeFactory);
+ ShardingSphereView view = schema.getView(table.getName());
+ return new ViewTable(javaTypeFactory.getJavaClass(relDataType), RelDataTypeImpl.proto(relDataType), view.getViewDefinition(), Collections.emptyList(), Collections.emptyList());
+ }
}
diff --git a/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java b/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
index 56a73299af1..da81b1c82a7 100644
--- a/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
+++ b/kernel/sql-federation/optimizer/src/test/java/org/apache/shardingsphere/sqlfederation/optimizer/SQLOptimizeEngineTest.java
@@ -126,7 +126,7 @@ public final class SQLOptimizeEngineTest {
private SqlToRelConverter createSqlToRelConverter(final ShardingSphereSchema schema) {
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(new Properties());
RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
- TranslatableSchema federationSchema = new TranslatableSchema(SCHEMA_NAME, schema, null);
+ TranslatableSchema federationSchema = new TranslatableSchema(SCHEMA_NAME, schema, new JavaTypeFactoryImpl(), null);
CalciteCatalogReader catalogReader = SQLFederationPlannerUtil.createCatalogReader(SCHEMA_NAME, federationSchema, relDataTypeFactory, connectionConfig);
H2DatabaseType databaseType = new H2DatabaseType();
SqlValidator validator = SQLFederationPlannerUtil.createSqlValidator(catalogReader, relDataTypeFactory, databaseType, connectionConfig);