You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/07/30 09:25:41 UTC

[shardingsphere] branch master updated: Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a696de87196 Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)
a696de87196 is described below

commit a696de87196885f532b07c9a5b5c2b2bc95c8e06
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Sat Jul 30 17:25:29 2022 +0800

    Add FederationResultSetMetaData to implement AdvancedFederationExecutor logic (#19712)
---
 .../advanced/AdvancedFederationExecutor.java       |  29 ++--
 .../advanced/resultset/FederationResultSet.java    |  13 +-
 .../resultset/FederationResultSetMetaData.java     | 176 +++++++++++++++++++++
 .../executor/original/schema/FilterableSchema.java |   1 +
 4 files changed, 206 insertions(+), 13 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
index efa13524d4b..6782cd3f8aa 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.federation.executor.advanced;
 
+import com.google.common.base.Preconditions;
 import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.config.CalciteConnectionConfig;
@@ -30,6 +31,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.runtime.Bindable;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -48,6 +51,7 @@ import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerCon
 import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
 import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.Connection;
@@ -91,22 +95,29 @@ public final class AdvancedFederationExecutor implements FederationExecutor {
     @Override
     public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                   final JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext federationContext) throws SQLException {
-        SQLStatement sqlStatement = federationContext.getLogicSQL().getSqlStatementContext().getSqlStatement();
-        Enumerator<Object[]> enumerator = execute(sqlStatement, federationContext, prepareEngine, callback).enumerator();
-        resultSet = new FederationResultSet(enumerator, federationContext.getLogicSQL().getSqlStatementContext());
+        SQLStatementContext<?> sqlStatementContext = federationContext.getLogicSQL().getSqlStatementContext();
+        Preconditions.checkArgument(sqlStatementContext instanceof SelectStatementContext, "SQL statement context must be select statement context.");
+        ShardingSphereSchema schema = federationContext.getDatabases().get(databaseName.toLowerCase()).getSchema(schemaName);
+        FilterableSchema filterableSchema = createFilterableSchema(prepareEngine, schema, callback, federationContext);
+        Enumerator<Object[]> enumerator = execute(sqlStatementContext.getSqlStatement(), filterableSchema).enumerator();
+        resultSet = new FederationResultSet(enumerator, schema, filterableSchema, sqlStatementContext);
         return resultSet;
     }
     
-    @SuppressWarnings("unchecked")
-    private Enumerable<Object[]> execute(final SQLStatement sqlStatement, final FederationContext federationContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
-                                         final JDBCExecutorCallback<? extends ExecuteResult> callback) {
+    private FilterableSchema createFilterableSchema(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final ShardingSphereSchema schema,
+                                                    final JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext federationContext) {
         FilterableTableScanExecutorContext executorContext = new FilterableTableScanExecutorContext(databaseName, schemaName, props, federationContext);
         FilterableTableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, eventBusContext);
+        FederationSchemaMetaData schemaMetaData = new FederationSchemaMetaData(schemaName, schema.getTables());
+        return new FilterableSchema(schemaMetaData, executor);
+    }
+    
+    @SuppressWarnings("unchecked")
+    private Enumerable<Object[]> execute(final SQLStatement sqlStatement, final FilterableSchema filterableSchema) {
+        // TODO remove OptimizerPlannerContextFactory call and use setup executor to handle this logic
         CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties());
         RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
-        FederationSchemaMetaData schemaMetaData = new FederationSchemaMetaData(schemaName, federationContext.getDatabases().get(databaseName).getSchema(schemaName).getTables());
-        // TODO remove OptimizerPlannerContextFactory call and use setup executor to handle this logic
-        CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, new FilterableSchema(schemaMetaData, executor), relDataTypeFactory, connectionConfig);
+        CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, filterableSchema, relDataTypeFactory, connectionConfig);
         SqlValidator validator = OptimizerPlannerContextFactory.createValidator(catalogReader, relDataTypeFactory, connectionConfig);
         SqlToRelConverter converter = OptimizerPlannerContextFactory.createConverter(catalogReader, validator, relDataTypeFactory);
         RelNode bestPlan = new ShardingSphereOptimizer(optimizerContext, converter).optimize(databaseName, schemaName, sqlStatement);
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
index c1fba3fd068..9cfed743252 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
@@ -17,11 +17,14 @@
 
 package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
 
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
 import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
+import org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 
 import java.io.InputStream;
 import java.io.Reader;
@@ -61,15 +64,18 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
     
     private final Map<String, Integer> columnLabelAndIndexMap;
     
+    private final FederationResultSetMetaData resultSetMetaData;
+    
     private Object[] currentRows;
     
     private boolean wasNull;
     
     private boolean closed;
     
-    public FederationResultSet(final Enumerator<Object[]> enumerator, final SQLStatementContext<?> sqlStatementContext) {
+    public FederationResultSet(final Enumerator<Object[]> enumerator, final ShardingSphereSchema schema, final FilterableSchema filterableSchema, final SQLStatementContext<?> sqlStatementContext) {
         this.enumerator = enumerator;
         columnLabelAndIndexMap = createColumnLabelAndIndexMap(sqlStatementContext);
+        resultSetMetaData = new FederationResultSetMetaData(schema, filterableSchema, new JavaTypeFactoryImpl(), (SelectStatementContext) sqlStatementContext);
     }
     
     private Map<String, Integer> createColumnLabelAndIndexMap(final SQLStatementContext<?> sqlStatementContext) {
@@ -312,8 +318,7 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
     
     @Override
     public ResultSetMetaData getMetaData() throws SQLException {
-        // TODO implement getMetaData for federation resultset
-        return null;
+        return resultSetMetaData;
     }
     
     @Override
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
new file mode 100644
index 00000000000..fbd2f44880d
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Table;
+import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
+import org.apache.shardingsphere.infra.binder.segment.select.projection.impl.ColumnProjection;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Federation result set meta data.
+ */
+@RequiredArgsConstructor
+public final class FederationResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
+    
+    private final ShardingSphereSchema schema;
+    
+    private final FilterableSchema filterableSchema;
+    
+    private final RelDataTypeFactory relDataTypeFactory;
+    
+    private final SelectStatementContext selectStatementContext;
+    
+    @Override
+    public int getColumnCount() throws SQLException {
+        return selectStatementContext.getProjectionsContext().getExpandProjections().size();
+    }
+    
+    @Override
+    public boolean isAutoIncrement(final int column) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean isCaseSensitive(final int column) {
+        return true;
+    }
+    
+    @Override
+    public boolean isSearchable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public boolean isCurrency(final int column) {
+        return false;
+    }
+    
+    @Override
+    public int isNullable(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || table.get().getRowType(relDataTypeFactory).isNullable() ? ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
+    }
+    
+    @Override
+    public boolean isSigned(final int column) throws SQLException {
+        return true;
+    }
+    
+    @Override
+    public int getColumnDisplaySize(final int column) {
+        return findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional))).map(optional -> optional.getRowType(relDataTypeFactory).getPrecision()).orElse(0);
+    }
+    
+    @Override
+    public String getColumnLabel(final int column) throws SQLException {
+        Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+        if (projection instanceof ColumnProjection) {
+            return ((ColumnProjection) projection).getName();
+        }
+        return projection.getColumnLabel();
+    }
+    
+    @Override
+    public String getColumnName(final int column) throws SQLException {
+        Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+        if (projection instanceof ColumnProjection) {
+            return ((ColumnProjection) projection).getName();
+        }
+        return projection.getColumnLabel();
+    }
+    
+    @Override
+    public String getSchemaName(final int column) {
+        return DefaultDatabase.LOGIC_NAME;
+    }
+    
+    @Override
+    public int getPrecision(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || RelDataType.PRECISION_NOT_SPECIFIED == table.get().getRowType(relDataTypeFactory).getPrecision() ? 0 : table.get().getRowType(relDataTypeFactory).getPrecision();
+    }
+    
+    @Override
+    public int getScale(final int column) {
+        Optional<Table> table = findTableName(column).flatMap(optional -> Optional.ofNullable(filterableSchema.getTable(optional)));
+        return !table.isPresent() || RelDataType.SCALE_NOT_SPECIFIED == table.get().getRowType(relDataTypeFactory).getScale() ? 0 : table.get().getRowType(relDataTypeFactory).getScale();
+    }
+    
+    @Override
+    public String getTableName(final int column) throws SQLException {
+        return findTableName(column).orElse("");
+    }
+    
+    @Override
+    public String getCatalogName(final int column) {
+        return DefaultDatabase.LOGIC_NAME;
+    }
+    
+    @Override
+    public int getColumnType(final int column) throws SQLException {
+        return 0;
+    }
+    
+    @Override
+    public String getColumnTypeName(final int column) throws SQLException {
+        return "";
+    }
+    
+    @Override
+    public boolean isReadOnly(final int column) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public boolean isWritable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public boolean isDefinitelyWritable(final int column) {
+        return false;
+    }
+    
+    @Override
+    public String getColumnClassName(final int column) {
+        return "";
+    }
+    
+    private Optional<String> findTableName(final int column) {
+        Projection projection = selectStatementContext.getProjectionsContext().getExpandProjections().get(column - 1);
+        if (projection instanceof ColumnProjection) {
+            Map<String, String> tableNamesByColumnProjection =
+                    selectStatementContext.getTablesContext().findTableNamesByColumnProjection(Collections.singletonList((ColumnProjection) projection), schema);
+            return Optional.of(tableNamesByColumnProjection.get(projection.getExpression()));
+        }
+        return Optional.empty();
+    }
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
index 318823b777b..ca851972870 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
@@ -47,6 +47,7 @@ public final class FilterableSchema extends AbstractSchema {
     private Map<String, Table> createTableMap(final FederationSchemaMetaData schemaMetaData, final FilterableTableScanExecutor executor) {
         Map<String, Table> result = new LinkedHashMap<>(schemaMetaData.getTables().size(), 1);
         for (FederationTableMetaData each : schemaMetaData.getTables().values()) {
+            // TODO implement table statistic logic after using custom operators
             result.put(each.getName(), new FilterableTable(each, executor, new FederationTableStatistic()));
         }
         return result;