You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/30 01:19:32 UTC
[shardingsphere] branch master updated: Implement AdvancedFederationExecutor bind logic (#19704)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3dc21da55e8 Implement AdvancedFederationExecutor bind logic (#19704)
3dc21da55e8 is described below
commit 3dc21da55e8cd66efb6c45f0a45e85de93e83fc5
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Sat Jul 30 09:19:25 2022 +0800
Implement AdvancedFederationExecutor bind logic (#19704)
* Implement AdvancedFederationExecutor bind logic
* adjust RBO optimize sequence and optimizer unit test
* optimize code style
---
.../DatabaseDiscoveryRuleQueryResultSetTest.java | 2 +-
.../impl/driver/jdbc/type/util}/ResultSetUtil.java | 2 +-
.../driver/jdbc/type/util}/ResultSetUtilTest.java | 2 +-
.../advanced/AdvancedFederationExecutor.java | 97 ++++----
.../advanced/resultset/FederationResultSet.java | 253 +++++++++++++--------
.../advanced/AdvancedFederationExecutorTest.java | 7 +-
.../optimizer/ShardingSphereOptimizer.java | 3 +-
.../planner/OptimizerPlannerContextFactory.java | 39 +++-
.../planner/QueryOptimizePlannerFactory.java | 28 ++-
.../optimizer/ShardingSphereOptimizerTest.java | 87 ++++---
.../impl/enumerable/EnumerableMergedResult.java | 81 -------
.../core/resultset/DatabaseMetaDataResultSet.java | 1 +
.../core/resultset/ShardingSphereResultSet.java | 7 +-
13 files changed, 326 insertions(+), 283 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/query/DatabaseDiscoveryRuleQueryResultSetTest.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/query/DatabaseDiscoveryRule [...]
index 940596878d3..c53257c1ebe 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/query/DatabaseDiscoveryRuleQueryResultSetTest.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/query/DatabaseDiscoveryRuleQueryResultSetTest.java
@@ -61,7 +61,7 @@ public final class DatabaseDiscoveryRuleQueryResultSetTest {
assertColumns(resultSet.getColumnNames());
assertRowData(new ArrayList<>(resultSet.getRowData()));
}
-
+
@Test
public void assertInitWithNullDatabaseDiscoveryRule() {
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtil.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtil.java
similarity index 98%
rename from shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtil.java
rename to shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtil.java
index c1cfc2ad2e4..214298e4694 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtil.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.jdbc.core.resultset;
+package org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtilTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtilTest.java
similarity index 98%
rename from shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtilTest.java
rename to shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtilTest.java
index 9364f8dc762..2f2b2b24bcf 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ResultSetUtilTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/result/query/impl/driver/jdbc/type/util/ResultSetUtilTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.driver.jdbc.core.resultset;
+package org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
index 3dd7b307eea..efa13524d4b 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
@@ -17,32 +17,43 @@
package org.apache.shardingsphere.infra.federation.executor.advanced;
-import org.apache.calcite.interpreter.InterpretableConvention;
-import org.apache.calcite.interpreter.InterpretableConverter;
+import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.config.CalciteConnectionConfigImpl;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.federation.executor.FederationContext;
import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import org.apache.shardingsphere.infra.federation.executor.advanced.resultset.FederationResultSet;
+import org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanExecutor;
+import org.apache.shardingsphere.infra.federation.executor.original.table.FilterableTableScanExecutorContext;
import org.apache.shardingsphere.infra.federation.optimizer.ShardingSphereOptimizer;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
-import org.apache.shardingsphere.infra.merge.result.impl.enumerable.EnumerableMergedResult;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
-import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
+import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collections;
/**
* Advanced federation executor.
@@ -55,57 +66,61 @@ public final class AdvancedFederationExecutor implements FederationExecutor {
private final OptimizerContext optimizerContext;
- private final ShardingSphereOptimizer optimizer;
+ private final ShardingSphereRuleMetaData globalRuleMetaData;
- private ResultSet federationResultSet;
+ private final ConfigurationProperties props;
- public AdvancedFederationExecutor(final String databaseName, final String schemaName, final OptimizerContext optimizerContext) {
+ private final JDBCExecutor jdbcExecutor;
+
+ private final EventBusContext eventBusContext;
+
+ private ResultSet resultSet;
+
+ public AdvancedFederationExecutor(final String databaseName, final String schemaName, final OptimizerContext optimizerContext,
+ final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props, final JDBCExecutor jdbcExecutor,
+ final EventBusContext eventBusContext) {
this.databaseName = databaseName;
this.schemaName = schemaName;
this.optimizerContext = optimizerContext;
- optimizer = new ShardingSphereOptimizer(optimizerContext);
+ this.globalRuleMetaData = globalRuleMetaData;
+ this.props = props;
+ this.jdbcExecutor = jdbcExecutor;
+ this.eventBusContext = eventBusContext;
}
@Override
public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext federationContext) throws SQLException {
- String sql = federationContext.getLogicSQL().getSql();
- ShardingSphereSQLParserEngine parserEngine = new ShardingSphereSQLParserEngine(
- federationContext.getDatabases().get(databaseName.toLowerCase()).getProtocolType().getType(), new CacheOption(1, 1), new CacheOption(1, 1), false);
- SQLStatement sqlStatement = parserEngine.parse(sql, false);
- Enumerable<Object[]> enumerableResult = execute(sqlStatement);
- MergedResult mergedResult = new EnumerableMergedResult(enumerableResult);
- federationResultSet = new FederationResultSet(mergedResult);
- return federationResultSet;
+ SQLStatement sqlStatement = federationContext.getLogicSQL().getSqlStatementContext().getSqlStatement();
+ Enumerator<Object[]> enumerator = execute(sqlStatement, federationContext, prepareEngine, callback).enumerator();
+ resultSet = new FederationResultSet(enumerator, federationContext.getLogicSQL().getSqlStatementContext());
+ return resultSet;
}
- private Enumerable<Object[]> execute(final SQLStatement sqlStatement) {
- // TODO
- return execute(optimizer.optimize(databaseName, schemaName, sqlStatement));
- }
-
- private Enumerable<Object[]> execute(final RelNode bestPlan) {
- RelOptCluster cluster = bestPlan.getCluster();
- SqlValidator validator = optimizerContext.getPlannerContexts().get(databaseName).getValidators().get(schemaName);
- SqlToRelConverter converter = optimizerContext.getPlannerContexts().get(databaseName).getConverters().get(schemaName);
- return new FederateInterpretableConverter(
- cluster, cluster.traitSetOf(InterpretableConvention.INSTANCE), bestPlan).bind(new AdvancedExecuteDataContext(validator, converter));
+ @SuppressWarnings("unchecked")
+ private Enumerable<Object[]> execute(final SQLStatement sqlStatement, final FederationContext federationContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final JDBCExecutorCallback<? extends ExecuteResult> callback) {
+ FilterableTableScanExecutorContext executorContext = new FilterableTableScanExecutorContext(databaseName, schemaName, props, federationContext);
+ FilterableTableScanExecutor executor = new FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, globalRuleMetaData, executorContext, eventBusContext);
+ CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties());
+ RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
+ FederationSchemaMetaData schemaMetaData = new FederationSchemaMetaData(schemaName, federationContext.getDatabases().get(databaseName).getSchema(schemaName).getTables());
+ // TODO remove OptimizerPlannerContextFactory call and use setup executor to handle this logic
+ CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, new FilterableSchema(schemaMetaData, executor), relDataTypeFactory, connectionConfig);
+ SqlValidator validator = OptimizerPlannerContextFactory.createValidator(catalogReader, relDataTypeFactory, connectionConfig);
+ SqlToRelConverter converter = OptimizerPlannerContextFactory.createConverter(catalogReader, validator, relDataTypeFactory);
+ RelNode bestPlan = new ShardingSphereOptimizer(optimizerContext, converter).optimize(databaseName, schemaName, sqlStatement);
+ Bindable<Object[]> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) bestPlan, EnumerableRel.Prefer.ARRAY);
+ return executablePlan.bind(new AdvancedExecuteDataContext(validator, converter));
}
@Override
public ResultSet getResultSet() {
- return federationResultSet;
+ return resultSet;
}
@Override
- public void close() {
- // TODO
- }
-
- public static final class FederateInterpretableConverter extends InterpretableConverter {
-
- public FederateInterpretableConverter(final RelOptCluster cluster, final RelTraitSet traits, final RelNode input) {
- super(cluster, traits, input);
- }
+ public void close() throws SQLException {
+ resultSet.close();
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
index 8e00355d316..c1fba3fd068 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
@@ -17,8 +17,11 @@
package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
+import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import java.io.InputStream;
import java.io.Reader;
@@ -31,234 +34,271 @@ import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* Federation result set.
*/
-@RequiredArgsConstructor
public final class FederationResultSet extends AbstractUnsupportedOperationResultSet {
- private final MergedResult mergedResult;
+ private static final String ASCII = "Ascii";
+
+ private static final String UNICODE = "Unicode";
+
+ private static final String BINARY = "Binary";
+
+ private final Enumerator<Object[]> enumerator;
+
+ private final Map<String, Integer> columnLabelAndIndexMap;
+
+ private Object[] currentRows;
+
+ private boolean wasNull;
+
+ private boolean closed;
+
+ public FederationResultSet(final Enumerator<Object[]> enumerator, final SQLStatementContext<?> sqlStatementContext) {
+ this.enumerator = enumerator;
+ columnLabelAndIndexMap = createColumnLabelAndIndexMap(sqlStatementContext);
+ }
+
+ private Map<String, Integer> createColumnLabelAndIndexMap(final SQLStatementContext<?> sqlStatementContext) {
+ SelectStatementContext statementContext = (SelectStatementContext) sqlStatementContext;
+ List<Projection> projections = new ArrayList<>(statementContext.getProjectionsContext().getProjections());
+ Map<String, Integer> result = new HashMap<>(projections.size(), 1);
+ for (int columnIndex = 1; columnIndex <= projections.size(); columnIndex++) {
+ result.put(projections.get(columnIndex - 1).getColumnLabel(), columnIndex);
+ }
+ return result;
+ }
@Override
public boolean next() throws SQLException {
- return mergedResult.next();
+ boolean result = enumerator.moveNext();
+ currentRows = result ? enumerator.current() : new Object[]{};
+ return result;
}
@Override
public void close() throws SQLException {
-
+ closed = true;
+ enumerator.close();
+ currentRows = null;
}
@Override
public boolean wasNull() {
- return false;
+ return wasNull;
}
@Override
public String getString(final int columnIndex) throws SQLException {
- return null;
+ return (String) ResultSetUtil.convertValue(getValue(columnIndex, String.class), String.class);
}
@Override
public String getString(final String columnLabel) throws SQLException {
- return null;
+ return getString(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public boolean getBoolean(final int columnIndex) throws SQLException {
- return false;
+ return (boolean) ResultSetUtil.convertValue(getValue(columnIndex, boolean.class), boolean.class);
}
@Override
public boolean getBoolean(final String columnLabel) throws SQLException {
- return false;
+ return getBoolean(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public byte getByte(final int columnIndex) throws SQLException {
- return 0;
+ return (byte) ResultSetUtil.convertValue(getValue(columnIndex, byte.class), byte.class);
}
@Override
public byte getByte(final String columnLabel) throws SQLException {
- return 0;
+ return getByte(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public short getShort(final int columnIndex) throws SQLException {
- return 0;
+ return (short) ResultSetUtil.convertValue(getValue(columnIndex, short.class), short.class);
}
@Override
public short getShort(final String columnLabel) throws SQLException {
- return 0;
+ return getShort(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public int getInt(final int columnIndex) throws SQLException {
- return 0;
+ return (int) ResultSetUtil.convertValue(getValue(columnIndex, int.class), int.class);
}
@Override
public int getInt(final String columnLabel) throws SQLException {
- return 0;
+ return getInt(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public long getLong(final int columnIndex) throws SQLException {
- return 0;
+ return (long) ResultSetUtil.convertValue(getValue(columnIndex, long.class), long.class);
}
@Override
public long getLong(final String columnLabel) throws SQLException {
- return 0;
+ return getLong(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public float getFloat(final int columnIndex) {
- return 0;
+ public float getFloat(final int columnIndex) throws SQLException {
+ return (float) ResultSetUtil.convertValue(getValue(columnIndex, float.class), float.class);
}
@Override
- public float getFloat(final String columnLabel) {
- return 0;
+ public float getFloat(final String columnLabel) throws SQLException {
+ return getFloat(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public double getDouble(final int columnIndex) {
- return 0;
+ public double getDouble(final int columnIndex) throws SQLException {
+ return (double) ResultSetUtil.convertValue(getValue(columnIndex, double.class), double.class);
}
@Override
- public double getDouble(final String columnLabel) {
- return 0;
+ public double getDouble(final String columnLabel) throws SQLException {
+ return getDouble(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public BigDecimal getBigDecimal(final int columnIndex, final int scale) {
- return null;
+ public BigDecimal getBigDecimal(final int columnIndex, final int scale) throws SQLException {
+ return (BigDecimal) ResultSetUtil.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
}
@Override
- public BigDecimal getBigDecimal(final String columnLabel, final int scale) {
- return null;
+ public BigDecimal getBigDecimal(final String columnLabel, final int scale) throws SQLException {
+ return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public BigDecimal getBigDecimal(final int columnIndex) {
- return null;
+ public BigDecimal getBigDecimal(final int columnIndex) throws SQLException {
+ return (BigDecimal) ResultSetUtil.convertValue(getValue(columnIndex, BigDecimal.class), BigDecimal.class);
}
@Override
- public BigDecimal getBigDecimal(final String columnLabel) {
- return null;
+ public BigDecimal getBigDecimal(final String columnLabel) throws SQLException {
+ return getBigDecimal(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public byte[] getBytes(final int columnIndex) throws SQLException {
- return new byte[0];
+ return (byte[]) ResultSetUtil.convertValue(getValue(columnIndex, byte[].class), byte[].class);
}
@Override
public byte[] getBytes(final String columnLabel) throws SQLException {
- return new byte[0];
+ return getBytes(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public Date getDate(final int columnIndex) throws SQLException {
- return null;
+ return (Date) ResultSetUtil.convertValue(getValue(columnIndex, Date.class), Date.class);
}
@Override
public Date getDate(final String columnLabel) throws SQLException {
- return null;
+ return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public Date getDate(final int columnIndex, final Calendar cal) throws SQLException {
- return null;
+ return (Date) ResultSetUtil.convertValue(getCalendarValue(columnIndex, Date.class, cal), Date.class);
}
@Override
public Date getDate(final String columnLabel, final Calendar cal) throws SQLException {
- return null;
+ return getDate(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
}
@Override
public Time getTime(final int columnIndex) throws SQLException {
- return null;
+ return (Time) ResultSetUtil.convertValue(getValue(columnIndex, Time.class), Time.class);
}
@Override
public Time getTime(final String columnLabel) throws SQLException {
- return null;
+ return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public Time getTime(final int columnIndex, final Calendar cal) throws SQLException {
- return null;
+ return (Time) ResultSetUtil.convertValue(getCalendarValue(columnIndex, Time.class, cal), Time.class);
}
@Override
public Time getTime(final String columnLabel, final Calendar cal) throws SQLException {
- return null;
+ return getTime(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
}
@Override
public Timestamp getTimestamp(final int columnIndex) throws SQLException {
- return null;
+ return (Timestamp) ResultSetUtil.convertValue(getValue(columnIndex, Timestamp.class), Timestamp.class);
}
@Override
public Timestamp getTimestamp(final String columnLabel) throws SQLException {
- return null;
+ return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public Timestamp getTimestamp(final int columnIndex, final Calendar cal) throws SQLException {
- return null;
+ return (Timestamp) ResultSetUtil.convertValue(getCalendarValue(columnIndex, Timestamp.class, cal), Timestamp.class);
}
@Override
public Timestamp getTimestamp(final String columnLabel, final Calendar cal) throws SQLException {
- return null;
+ return getTimestamp(getIndexFromColumnLabelAndIndexMap(columnLabel), cal);
}
@Override
- public InputStream getAsciiStream(final int columnIndex) {
- return null;
+ public InputStream getAsciiStream(final int columnIndex) throws SQLException {
+ return getInputStream(ASCII);
}
@Override
- public InputStream getAsciiStream(final String columnLabel) {
- return null;
+ public InputStream getAsciiStream(final String columnLabel) throws SQLException {
+ return getAsciiStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public InputStream getUnicodeStream(final int columnIndex) {
- return null;
+ public InputStream getUnicodeStream(final int columnIndex) throws SQLException {
+ return getInputStream(UNICODE);
}
@Override
- public InputStream getUnicodeStream(final String columnLabel) {
- return null;
+ public InputStream getUnicodeStream(final String columnLabel) throws SQLException {
+ return getUnicodeStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public InputStream getBinaryStream(final int columnIndex) {
- return null;
+ public InputStream getBinaryStream(final int columnIndex) throws SQLException {
+ return getInputStream(BINARY);
}
@Override
- public InputStream getBinaryStream(final String columnLabel) {
- return null;
+ public InputStream getBinaryStream(final String columnLabel) throws SQLException {
+ return getBinaryStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
@@ -268,42 +308,41 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
@Override
public void clearWarnings() throws SQLException {
-
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
+ // TODO implement getMetaData for federation resultset
return null;
}
@Override
public Object getObject(final int columnIndex) throws SQLException {
- return null;
+ return getValue(columnIndex, Object.class);
}
@Override
public Object getObject(final String columnLabel) throws SQLException {
- return null;
+ return getObject(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public int findColumn(final String columnLabel) throws SQLException {
- return 0;
+ return getIndexFromColumnLabelAndIndexMap(columnLabel);
}
@Override
- public Reader getCharacterStream(final int columnIndex) {
- return null;
+ public Reader getCharacterStream(final int columnIndex) throws SQLException {
+ return (Reader) getValue(columnIndex, Reader.class);
}
@Override
- public Reader getCharacterStream(final String columnLabel) {
- return null;
+ public Reader getCharacterStream(final String columnLabel) throws SQLException {
+ return getCharacterStream(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public void setFetchDirection(final int direction) throws SQLException {
-
}
@Override
@@ -336,67 +375,95 @@ public final class FederationResultSet extends AbstractUnsupportedOperationResul
}
@Override
- public Blob getBlob(final int columnIndex) {
- return null;
+ public Blob getBlob(final int columnIndex) throws SQLException {
+ return (Blob) getValue(columnIndex, Blob.class);
}
@Override
- public Blob getBlob(final String columnLabel) {
- return null;
+ public Blob getBlob(final String columnLabel) throws SQLException {
+ return getBlob(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public Clob getClob(final int columnIndex) {
- return null;
+ public Clob getClob(final int columnIndex) throws SQLException {
+ return (Clob) getValue(columnIndex, Clob.class);
}
@Override
- public Clob getClob(final String columnLabel) {
- return null;
+ public Clob getClob(final String columnLabel) throws SQLException {
+ return getClob(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public Array getArray(final int columnIndex) {
- return null;
+ public Array getArray(final int columnIndex) throws SQLException {
+ return (Array) getValue(columnIndex, Array.class);
}
@Override
- public Array getArray(final String columnLabel) {
- return null;
+ public Array getArray(final String columnLabel) throws SQLException {
+ return getArray(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public URL getURL(final int columnIndex) throws SQLException {
- return null;
+ return (URL) getValue(columnIndex, URL.class);
}
@Override
public URL getURL(final String columnLabel) throws SQLException {
- return null;
+ return getURL(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
public boolean isClosed() throws SQLException {
- return false;
+ return closed;
}
@Override
- public SQLXML getSQLXML(final int columnIndex) {
- return null;
+ public SQLXML getSQLXML(final int columnIndex) throws SQLException {
+ return (SQLXML) getValue(columnIndex, SQLXML.class);
}
@Override
- public SQLXML getSQLXML(final String columnLabel) {
- return null;
+ public SQLXML getSQLXML(final String columnLabel) throws SQLException {
+ return getSQLXML(getIndexFromColumnLabelAndIndexMap(columnLabel));
}
@Override
- public String getNString(final int columnIndex) {
- return null;
+ public String getNString(final int columnIndex) throws SQLException {
+ return getString(columnIndex);
}
@Override
- public String getNString(final String columnLabel) {
- return null;
+ public String getNString(final String columnLabel) throws SQLException {
+ return getString(columnLabel);
+ }
+
+ private Integer getIndexFromColumnLabelAndIndexMap(final String columnLabel) throws SQLFeatureNotSupportedException {
+ Integer result = columnLabelAndIndexMap.get(columnLabel);
+ if (null == result) {
+ throw new SQLFeatureNotSupportedException(String.format("can't get index from columnLabel[%s].", columnLabel));
+ }
+ return result;
+ }
+
+ private Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
+ if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
+ throw new SQLFeatureNotSupportedException(String.format("Get value from `%s`", type.getName()));
+ }
+ Object result = currentRows[columnIndex - 1];
+ wasNull = null == result;
+ return result;
+ }
+
+ private Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) {
+ // TODO implement with calendar
+ Object result = currentRows[columnIndex - 1];
+ wasNull = null == result;
+ return result;
+ }
+
+ private InputStream getInputStream(final String type) throws SQLException {
+ throw new SQLFeatureNotSupportedException(String.format("Get input stream from `%s`", type));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/test/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutorTest.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/test/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutorTest.java
index d9af50cc223..0565719eb94 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/test/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutorTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/test/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutorTest.java
@@ -18,7 +18,10 @@
package org.apache.shardingsphere.infra.federation.executor.advanced;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import org.apache.shardingsphere.infra.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -38,6 +41,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -56,7 +60,8 @@ public class AdvancedFederationExecutorTest {
Map<String, ShardingSphereSchema> schemas = Collections.singletonMap(databaseName, new ShardingSphereSchema(tables));
ShardingSphereDatabase metaData = new ShardingSphereDatabase(schemaName, new H2DatabaseType(), mockResource(), null, schemas);
OptimizerContext optimizerContext = OptimizerContextFactory.create(Collections.singletonMap(schemaName, metaData), createGlobalRuleMetaData());
- executor = new AdvancedFederationExecutor(databaseName, schemaName, optimizerContext);
+ executor = new AdvancedFederationExecutor(databaseName, schemaName, optimizerContext, mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new Properties()),
+ mock(JDBCExecutor.class), mock(EventBusContext.class));
}
private ShardingSphereRuleMetaData createGlobalRuleMetaData() {
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
index 1c1e43af0ba..27eac3e72b9 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizer.java
@@ -36,6 +36,8 @@ public final class ShardingSphereOptimizer {
private final OptimizerContext context;
+ private final SqlToRelConverter converter;
+
/**
* Optimize query execution plan.
*
@@ -46,7 +48,6 @@ public final class ShardingSphereOptimizer {
*/
public RelNode optimize(final String databaseName, final String schemaName, final SQLStatement sqlStatement) {
try {
- SqlToRelConverter converter = context.getPlannerContexts().get(databaseName).getConverters().get(schemaName);
SqlNode sqlNode = SQLNodeConverterEngine.convertToSQLNode(sqlStatement);
RelNode logicPlan = converter.convertQuery(sqlNode, true, true).rel;
RelNode bestPlan = optimizeWithRBO(logicPlan, context.getPlannerContexts().get(databaseName).getHepPlanners().get(schemaName));
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
index ea0875e12f1..266a735c31d 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/context/planner/OptimizerPlannerContextFactory.java
@@ -93,20 +93,41 @@ public final class OptimizerPlannerContextFactory {
return new OptimizerPlannerContext(validators, converters, hepPlanners);
}
- private static Properties createConnectionProperties() {
+ /**
+ * Create connection properties.
+ *
+ * @return properties
+ */
+ public static Properties createConnectionProperties() {
Properties result = new Properties();
result.setProperty(CalciteConnectionProperty.TIME_ZONE.camelName(), "UTC");
return result;
}
- private static CalciteCatalogReader createCatalogReader(final String schemaName, final Schema schema,
- final RelDataTypeFactory relDataTypeFactory, final CalciteConnectionConfig connectionConfig) {
+ /**
+ * Create catalog reader.
+ *
+ * @param schemaName schema name
+ * @param schema schema
+ * @param relDataTypeFactory rel data type factory
+ * @param connectionConfig connection config
+ * @return calcite catalog reader
+ */
+ public static CalciteCatalogReader createCatalogReader(final String schemaName, final Schema schema, final RelDataTypeFactory relDataTypeFactory, final CalciteConnectionConfig connectionConfig) {
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
rootSchema.add(schemaName, schema);
return new CalciteCatalogReader(rootSchema, Collections.singletonList(schemaName), relDataTypeFactory, connectionConfig);
}
- private static SqlValidator createValidator(final CalciteCatalogReader catalogReader, final RelDataTypeFactory relDataTypeFactory, final CalciteConnectionConfig connectionConfig) {
+ /**
+ * Create validator.
+ *
+ * @param catalogReader catalog reader
+ * @param relDataTypeFactory rel data type factory
+ * @param connectionConfig connection config
+ * @return sql validator
+ */
+ public static SqlValidator createValidator(final CalciteCatalogReader catalogReader, final RelDataTypeFactory relDataTypeFactory, final CalciteConnectionConfig connectionConfig) {
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(connectionConfig.lenientOperatorLookup())
.withSqlConformance(connectionConfig.conformance())
@@ -115,7 +136,15 @@ public final class OptimizerPlannerContextFactory {
return SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, relDataTypeFactory, validatorConfig);
}
- private static SqlToRelConverter createConverter(final CalciteCatalogReader catalogReader, final SqlValidator validator, final RelDataTypeFactory relDataTypeFactory) {
+ /**
+ * Create Converter.
+ *
+ * @param catalogReader catalog reader
+ * @param validator validator
+ * @param relDataTypeFactory rel data type factory
+ * @return sql to rel converter
+ */
+ public static SqlToRelConverter createConverter(final CalciteCatalogReader catalogReader, final SqlValidator validator, final RelDataTypeFactory relDataTypeFactory) {
ViewExpander expander = (rowType, queryString, schemaPath, viewPath) -> null;
Config converterConfig = SqlToRelConverter.config().withTrimUnusedFields(true);
RelOptCluster cluster = RelOptCluster.create(QueryOptimizePlannerFactory.createVolcanoPlanner(), new RexBuilder(relDataTypeFactory));
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
index bc763329503..ded150bf181 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/planner/QueryOptimizePlannerFactory.java
@@ -28,7 +28,9 @@ import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
import java.util.Collection;
import java.util.LinkedList;
@@ -61,6 +63,7 @@ public final class QueryOptimizePlannerFactory {
HepProgramBuilder builder = new HepProgramBuilder();
builder.addGroupBegin().addRuleCollection(getSubQueryRules()).addGroupEnd().addMatchOrder(HepMatchOrder.DEPTH_FIRST);
builder.addGroupBegin().addRuleCollection(getFilterRules()).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP);
+ builder.addGroupBegin().addRuleCollection(getProjectRules()).addGroupEnd().addMatchOrder(HepMatchOrder.BOTTOM_UP);
builder.addMatchLimit(DEFAULT_MATCH_LIMIT);
return new HepPlanner(builder.build());
}
@@ -68,9 +71,6 @@ public final class QueryOptimizePlannerFactory {
private static void setUpRules(final RelOptPlanner planner) {
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelCollationTraitDef.INSTANCE);
- planner.addRule(CoreRules.FILTER_TO_CALC);
- planner.addRule(CoreRules.PROJECT_TO_CALC);
- planner.addRule(CoreRules.FILTER_INTO_JOIN);
planner.addRule(EnumerableRules.ENUMERABLE_CALC_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_JOIN_RULE);
@@ -88,20 +88,32 @@ public final class QueryOptimizePlannerFactory {
return result;
}
+ private static Collection<RelOptRule> getProjectRules() {
+ Collection<RelOptRule> result = new LinkedList<>();
+ result.add(AggregateExpandDistinctAggregatesRule.Config.DEFAULT.toRule());
+ result.add(CoreRules.PROJECT_TO_CALC);
+ result.add(CoreRules.FILTER_TO_CALC);
+ result.add(CoreRules.PROJECT_CALC_MERGE);
+ result.add(CoreRules.FILTER_CALC_MERGE);
+ result.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE);
+ result.add(CoreRules.PROJECT_SET_OP_TRANSPOSE);
+ result.add(CoreRules.PROJECT_JOIN_TRANSPOSE);
+ result.add(CoreRules.PROJECT_WINDOW_TRANSPOSE);
+ result.add(CoreRules.PROJECT_FILTER_TRANSPOSE);
+ result.add(CoreRules.PROJECT_REDUCE_EXPRESSIONS);
+ result.add(ProjectRemoveRule.Config.DEFAULT.toRule());
+ return result;
+ }
+
private static Collection<RelOptRule> getFilterRules() {
Collection<RelOptRule> result = new LinkedList<>();
result.add(CoreRules.FILTER_INTO_JOIN);
result.add(CoreRules.JOIN_CONDITION_PUSH);
result.add(CoreRules.SORT_JOIN_TRANSPOSE);
- result.add(CoreRules.PROJECT_CORRELATE_TRANSPOSE);
result.add(CoreRules.FILTER_AGGREGATE_TRANSPOSE);
result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
result.add(CoreRules.FILTER_SET_OP_TRANSPOSE);
- result.add(CoreRules.FILTER_PROJECT_TRANSPOSE);
result.add(CoreRules.FILTER_REDUCE_EXPRESSIONS);
- result.add(CoreRules.PROJECT_REDUCE_EXPRESSIONS);
- result.add(CoreRules.FILTER_MERGE);
- result.add(CoreRules.PROJECT_CALC_MERGE);
result.add(CoreRules.JOIN_PUSH_EXPRESSIONS);
result.add(CoreRules.JOIN_PUSH_TRANSITIVE_PREDICATES);
return result;
diff --git a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
index b5f0e1fefa7..63f09b94dc9 100644
--- a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/test/java/org/apache/shardingsphere/infra/federation/optimizer/ShardingSphereOptimizerTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.infra.federation.optimizer;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
+import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContextFactory;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResource;
@@ -101,7 +102,8 @@ public final class ShardingSphereOptimizerTest {
tables.put("t_user_info", createUserInfoTableMetaData());
ShardingSphereDatabase database = new ShardingSphereDatabase(databaseName,
new H2DatabaseType(), mockResource(), null, Collections.singletonMap(schemaName, new ShardingSphereSchema(tables)));
- optimizer = new ShardingSphereOptimizer(OptimizerContextFactory.create(Collections.singletonMap(databaseName, database), createGlobalRuleMetaData()));
+ OptimizerContext optimizerContext = OptimizerContextFactory.create(Collections.singletonMap(databaseName, database), createGlobalRuleMetaData());
+ optimizer = new ShardingSphereOptimizer(optimizerContext, optimizerContext.getPlannerContexts().get(databaseName).getConverters().get(schemaName));
}
private ShardingSphereRuleMetaData createGlobalRuleMetaData() {
@@ -135,14 +137,13 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_JOIN_CONDITION, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t3])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], user_id1=[$t4], information=[$t5])" + LINE_SEPARATOR
- + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 13)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t4])" + LINE_SEPARATOR
+ + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], expr#3=[CAST($t0):INTEGER], expr#4=[13], expr#5=[=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])"
+ + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -151,9 +152,8 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_WHERE_ALL_FIELDS, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}], proj#0..1=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -162,9 +162,8 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_WHERE_SINGLE_FIELD, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 12)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):INTEGER], expr#3=[12], expr#4=[=($t2, $t3)], user_id=[$t0], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -173,13 +172,12 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_WHERE, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t3])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], user_id1=[$t4], information=[$t5])" + LINE_SEPARATOR
- + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t4])" + LINE_SEPARATOR
+ + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -202,14 +200,13 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_CROSS_WHERE_CONDITION, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..4=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t3])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], user_id1=[$t4], information=[$t5])" + LINE_SEPARATOR
- + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], proj#0..2=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($0):INTEGER, 13)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], user_id0=[$t4])" + LINE_SEPARATOR
+ + " EnumerableHashJoin(condition=[=($3, $6)], joinType=[inner])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..2=[{inputs}], expr#3=[CAST($t1):VARCHAR], proj#0..3=[{exprs}])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):VARCHAR], expr#3=[CAST($t0):INTEGER], expr#4=[13], expr#5=[=($t3, $t4)], proj#0..2=[{exprs}], $condition=[$t5])"
+ + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -218,9 +215,8 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_SUBQUERY_FROM, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..1=[{inputs}], proj#0..1=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[>(CAST($0):INTEGER, 1)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t0):INTEGER], expr#3=[1], expr#4=[>($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -229,14 +225,13 @@ public final class ShardingSphereOptimizerTest {
ShardingSphereSQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine(DatabaseTypeEngine.getTrunkDatabaseTypeName(new H2DatabaseType()));
SQLStatement sqlStatement = sqlParserEngine.parse(SELECT_SUBQUERY_WHERE_EXIST, false);
String actual = optimizer.optimize(databaseName, schemaName, sqlStatement).explain();
- String expected = "EnumerableCalc(expr#0..3=[{inputs}], proj#0..1=[{exprs}])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[IS NOT NULL($3)])" + LINE_SEPARATOR
- + " EnumerableCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
- + " EnumerableAggregate(group=[{}], agg#0=[MIN($0)])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true], $f0=[$t2])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($cor0.user_id):VARCHAR, CAST($0):VARCHAR)])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ String expected = "EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NOT NULL($t3)], proj#0..1=[{exprs}], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{1}])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
+ + " EnumerableAggregate(group=[{}], agg#0=[MIN($0)])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true], expr#3=[$cor0], expr#4=[$t3.user_id],"
+ + " expr#5=[CAST($t4):VARCHAR], expr#6=[CAST($t0):VARCHAR], expr#7=[=($t5, $t6)], $f0=[$t2], $condition=[$t7])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
@@ -264,13 +259,11 @@ public final class ShardingSphereOptimizerTest {
+ " EnumerableNestedLoopJoin(condition=[>=($1, $3)], joinType=[inner])" + LINE_SEPARATOR
+ " EnumerableTableScan(table=[[federate_jdbc, t_order_federate]])" + LINE_SEPARATOR
+ " EnumerableAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($1):VARCHAR, 'before')])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t1):VARCHAR], expr#3=['before':VARCHAR], expr#4=[=($t2, $t3)], user_id=[$t0], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR
+ " EnumerableAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)])" + LINE_SEPARATOR
- + " EnumerableCalc(expr#0..1=[{inputs}], user_id=[$t0])" + LINE_SEPARATOR
- + " EnumerableFilter(condition=[=(CAST($1):VARCHAR, 'after')])" + LINE_SEPARATOR
- + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
+ + " EnumerableCalc(expr#0..1=[{inputs}], expr#2=[CAST($t1):VARCHAR], expr#3=['after':VARCHAR], expr#4=[=($t2, $t3)], user_id=[$t0], $condition=[$t4])" + LINE_SEPARATOR
+ + " EnumerableTableScan(table=[[federate_jdbc, t_user_info]])" + LINE_SEPARATOR;
assertThat(actual, is(expected));
}
}
diff --git a/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/enumerable/EnumerableMergedResult.java b/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/enumerable/EnumerableMergedResult.java
deleted file mode 100644
index ba58d4888df..00000000000
--- a/shardingsphere-infra/shardingsphere-infra-merge/src/main/java/org/apache/shardingsphere/infra/merge/result/impl/enumerable/EnumerableMergedResult.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.merge.result.impl.enumerable;
-
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.shardingsphere.infra.merge.result.MergedResult;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectOutputStream;
-import java.sql.SQLException;
-import java.util.Calendar;
-
-/**
- * Enumerable merged result.
- */
-@RequiredArgsConstructor
-public final class EnumerableMergedResult implements MergedResult {
-
- private final Enumerable<Object[]> enumerableResult;
-
- private boolean wasNull;
-
- @Override
- public boolean next() throws SQLException {
- return enumerableResult.enumerator().moveNext();
- }
-
- @Override
- public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
- Object result = enumerableResult.enumerator().current()[columnIndex - 1];
- wasNull = null == result;
- return result;
- }
-
- @Override
- public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
- Object result = enumerableResult.enumerator().current()[columnIndex - 1];
- wasNull = null == result;
- return result;
- }
-
- @Override
- public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
- return getInputStream(enumerableResult.enumerator().current()[columnIndex - 1]);
- }
-
- @SneakyThrows(IOException.class)
- private InputStream getInputStream(final Object value) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
- objectOutputStream.writeObject(value);
- objectOutputStream.flush();
- objectOutputStream.close();
- return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
- }
-
- @Override
- public boolean wasNull() throws SQLException {
- return wasNull;
- }
-}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/DatabaseMetaDataResultSet.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/DatabaseMetaDataResultSet.java
index 30577b0e4ab..2ae9729f4d7 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/DatabaseMetaDataResultSet.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/DatabaseMetaDataResultSet.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.jdbc.core.resultset;
import lombok.EqualsAndHashCode;
import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedDatabaseMetaDataResultSet;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
index 74295b4e811..d7f4e604004 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/resultset/ShardingSphereResultSet.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.core.resultset;
import org.apache.commons.collections4.map.CaseInsensitiveMap;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractResultSetAdapter;
import org.apache.shardingsphere.driver.jdbc.exception.SQLExceptionErrorCode;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
import org.apache.shardingsphere.infra.binder.segment.select.projection.DerivedColumn;
import org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -419,10 +420,10 @@ public final class ShardingSphereResultSet extends AbstractResultSetAdapter {
}
private Integer getIndexFromColumnLabelAndIndexMap(final String columnLabel) throws SQLFeatureNotSupportedException {
- Integer columnIndex = columnLabelAndIndexMap.get(columnLabel);
- if (null == columnIndex) {
+ Integer result = columnLabelAndIndexMap.get(columnLabel);
+ if (null == result) {
throw new SQLFeatureNotSupportedException(String.format("can't get index from columnLabel[%s].", columnLabel));
}
- return columnIndex;
+ return result;
}
}