You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/07/30 09:25:41 UTC
[shardingsphere] branch master updated: Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a696de87196 Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)
a696de87196 is described below
commit a696de87196885f532b07c9a5b5c2b2bc95c8e06
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Sat Jul 30 17:25:29 2022 +0800
Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)
---
.../advanced/AdvancedFederationExecutor.java | 29 ++--
.../advanced/resultset/FederationResultSet.java | 13 +-
.../resultset/FederationResultSetMetaData.java | 176 +++++++++++++++++++++
.../executor/original/schema/FilterableSchema.java | 1 +
4 files changed, 206 insertions(+), 13 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
index efa13524d4b..6782cd3f8aa 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.federation.executor.advanced;
+import com.google.common.base.Preconditions;
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -30,6 +31,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -48,6 +51,7 @@ import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerCon
import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.Connection;
@@ -91,22 +95,29 @@ public final class AdvancedFederationExecutor implements FederationExecutor {
@Override
public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext federationContext) throws SQLException {
- SQLStatement sqlStatement = federationContext.getLogicSQL().getSqlStatementContext().getSqlStatement();
- Enumerator<Object[]> enumerator = execute(sqlStatement, federationContext, prepareEngine, callback).enumerator();
- resultSet = new FederationResultSet(enumerator, federationContext.getLogicSQL().getSqlStatementContext());
+ SQLStatementContext<?> sqlStatementContext = federationContext.getLogicSQL().getSqlStatementContext();
+ Preconditions.checkArgument(sqlStatementContext instanceof SelectStatementContext, "SQL statement context must be select statement context.");
+ ShardingSphereSchema schema = federationContext.getDatabases().get(databaseName.toLowerCase()).getSchema(schemaName);
+ FilterableSchema filterableSchema = createFilterableSchema(prepareEngine, schema, callback, federationContext);
+ Enumerator<Object[]> enumerator = execute(sqlStatementContext.getSqlStatement(), filterableSchema).enumerator();
+ resultSet = new FederationResultSet(enumerator, schema, filterableSchema, sqlStatementContext);
return resultSet;
}
- @SuppressWarnings("unchecked")
- private Enumerable<Object[]> execute(final SQLStatement sqlStatement, final FederationContext federationContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends ExecuteResult> callback) {
+ private FilterableSchema createFilterableSchema(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final ShardingSphereSchema schema,
+ final JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext federationContext) {
FilterableTableScanExecutorContext executorContext = new FilterableTableScanExecutorContext(databaseName, schemaName, props, federationContext);
FilterableTableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, eventBusContext);
+ FederationSchemaMetaData schemaMetaData = new FederationSchemaMetaData(schemaName, schema.getTables());
+ return new FilterableSchema(schemaMetaData, executor);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Enumerable<Object[]> execute(final SQLStatement sqlStatement, final FilterableSchema filterableSchema) {
+ // TODO remove OptimizerPlannerContextFactory call and use setup executor to handle this logic
CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties());
RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
- FederationSchemaMetaData schemaMetaData = new FederationSchemaMetaData(schemaName, federationContext.getDatabases().get(databaseName).getSchema(schemaName).getTables());
- // TODO remove OptimizerPlannerContextFactory call and use setup executor to handle this logic
- CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, new FilterableSchema(schemaMetaData, executor), relDataTypeFactory, connectionConfig);
+ CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, filterableSchema, relDataTypeFactory, connectionConfig);
SqlValidator validator = OptimizerPlannerContextFactory.createValidator(catalogReader, relDataTypeFactory, connectionConfig);
SqlToRelConverter converter = OptimizerPlannerContextFactory.createConverter(catalogReader, validator, relDataTypeFactory);
RelNode bestPlan = new ShardingSphereOptimizer(optimizerContext, converter).optimize(databaseName, schemaName, sqlStatement);
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
index c1fba3fd068..9cfed743252 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
+import org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import java.io.InputStream;
import java.io.Reader;
@@ -61,15 +64,18 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
private final Map<String, Integer> columnLabelAndIndexMap;
+ private final FederationResultSetMetaData resultSetMetaData;
+
private Object[] currentRows;
private boolean wasNull;
private boolean closed;
- public FederationResultSet(final Enumerator<Object[]> enumerator, final SQLStatementContext<?> sqlStatementContext) {
+ public FederationResultSet(final Enumerator<Object[]> enumerator, final ShardingSphereSchema schema, final FilterableSchema filterableSchema, final SQLStatementContext<?> sqlStatementContext) {
this.enumerator = enumerator;
columnLabelAndIndexMap = createColumnLabelAndIndexMap(sqlStatementContext);
+ resultSetMetaData = new FederationResultSetMetaData(schema, filterableSchema, new JavaTypeFactoryImpl(), (SelectStatementContext) sqlStatementContext);
}
private Map<String, Integer> createColumnLabelAndIndexMap(final SQLStatementContext<?> sqlStatementContext) {
@@ -312,8 +318,7 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
@Override
public ResultSetMetaData getMetaData() throws SQLException {
- // TODO implement getMetaData for federation resultset
- return null;
+ return resultSetMetaData;
}
@Override
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
new file mode 100644
index 00000000000..fbd2f44880d
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Table;
+import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
+import org.apache.shardingsphere.infra.binder.segment.select.projection.impl.ColumnProjection;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Federation result set meta data.
+ */
+@RequiredArgsConstructor
+public final class FederationResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
+
+ private final ShardingSphereSchema schema;
+
+ private final FilterableSchema filterableSchema;
+
+ private final RelDataTypeFactory relDataTypeFactory;
+
+ private final SelectStatementContext selectStatementContext;
+
+ @Override
+ public int getColumnCount() throws SQLException {
+ return selectStatementContext.getProjectionsContext().getExpandProjections().size();
+ }
+
+ @Override
+ public boolean isAutoIncrement(final int column) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isCaseSensitive(final int column) {
+ return true;
+ }
+
+ @Override
+ public boolean isSearchable(final int column) {
+ return false;
+ }
+
+ @Override
+ public boolean isCurrency(final int column) {
+ return false;
+ }
+
+ @Override
+ public int isNullable(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() || table.get().getRowType(relDataTypeFactory).isNullable() ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
+ }
+
+ @Override
+ public boolean isSigned(final int column) throws SQLException {
+ return true;
+ }
+
+ @Override
+ public int getColumnDisplaySize(final int column) {
+ return findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional))).map(optional -> optional.getRowType(relDataTypeFactory).getPrecision()).orElse(0);
+ }
+
+ @Override
+ public String getColumnLabel(final int column) throws SQLException {
+ Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+ if (projection instanceof ColumnProjection) {
+ return ((ColumnProjection) projection).getName();
+ }
+ return projection.getColumnLabel();
+ }
+
+ @Override
+ public String getColumnName(final int column) throws SQLException {
+ Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+ if (projection instanceof ColumnProjection) {
+ return ((ColumnProjection) projection).getName();
+ }
+ return projection.getColumnLabel();
+ }
+
+ @Override
+ public String getSchemaName(final int column) {
+ return DefaultDatabase.LOGIC_NAME;
+ }
+
+ @Override
+ public int getPrecision(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() || RelDataType.PRECISION_NOT_SPECIFIED == table.get().getRowType(relDataTypeFactory).getPrecision() ? 0 : table.get().getRowType(relDataTypeFactory).getPrecision();
+ }
+
+ @Override
+ public int getScale(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() || RelDataType.SCALE_NOT_SPECIFIED == table.get().getRowType(relDataTypeFactory).getScale() ? 0 : table.get().getRowType(relDataTypeFactory).getScale();
+ }
+
+ @Override
+ public String getTableName(final int column) throws SQLException {
+ return findTableName(column).orElse("");
+ }
+
+ @Override
+ public String getCatalogName(final int column) {
+ return DefaultDatabase.LOGIC_NAME;
+ }
+
+ @Override
+ public int getColumnType(final int column) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public String getColumnTypeName(final int column) throws SQLException {
+ return "";
+ }
+
+ @Override
+ public boolean isReadOnly(final int column) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isWritable(final int column) {
+ return false;
+ }
+
+ @Override
+ public boolean isDefinitelyWritable(final int column) {
+ return false;
+ }
+
+ @Override
+ public String getColumnClassName(final int column) {
+ return "";
+ }
+
+ private Optional<String> findTableName(final int column) {
+ Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+ if (projection instanceof ColumnProjection) {
+ Map<String, String> tableNamesByColumnProjection =
+ selectStatementContext.getTablesContext().findTableNamesByColumnProjection(Collections.singletonList((ColumnProjection) projection), schema);
+ return Optional.of(tableNamesByColumnProjection.get(projection.getExpression()));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
index 318823b777b..ca851972870 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
@@ -47,6 +47,7 @@ public final class FilterableSchema extends AbstractSchema {
private Map<String, Table> createTableMap(final FederationSchemaMetaData schemaMetaData, final FilterableTableScanExecutor executor) {
Map<String, Table> result = new LinkedHashMap<>(schemaMetaData.getTables().size(), 1);
for (FederationTableMetaData each : schemaMetaData.getTables().values()) {
+ // TODO implement table statistic logic after using custom operators
result.put(each.getName(), new FilterableTable(each, executor, new FederationTableStatistic()));
}
return result;