You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/06/09 06:11:05 UTC
[shardingsphere] branch master updated: fix sql join error with
multi unbinding sharding table (#10728)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4f9c0e4 fix sql join error with multi unbinding sharding table (#10728)
4f9c0e4 is described below
commit 4f9c0e42dc15fd2b879f7f3c93369867499d2159
Author: Zhengqiang Duan <st...@gmail.com>
AuthorDate: Wed Jun 9 14:10:19 2021 +0800
fix sql join error with multi unbinding sharding table (#10728)
---
.../sql/federate/execute/FederateExecutor.java | 5 ++++-
.../sql/federate/execute/FederateJDBCExecutor.java | 25 +++++++++++++---------
.../federate/execute/raw/FederateRawExecutor.java | 4 +++-
.../federate/schema/row/FederateRowExecutor.java | 10 ++++-----
.../statement/ShardingSpherePreparedStatement.java | 10 +++------
.../core/statement/ShardingSphereStatement.java | 2 +-
.../backend/communication/ProxySQLExecutor.java | 7 +++---
7 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
index 58da05e..f646d34 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -36,10 +37,12 @@ public interface FederateExecutor {
*
* @param executionContext execution context
* @param callback callback
+ * @param type JDBC driver type
+ * @param statementOption statement option
* @return execute result
* @throws SQLException SQL exception
*/
- List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException;
+ List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback, String type, StatementOption statementOption) throws SQLException;
/**
* Close.
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
index 824b275..84eed4d 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.dr
import org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
import org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
@@ -87,17 +88,18 @@ public final class FederateJDBCExecutor implements FederateExecutor {
}
@Override
- public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
- QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback));
+ public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback,
+ final String type, final StatementOption statementOption) throws SQLException {
+ QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback, type, statementOption));
return Collections.singletonList(result);
}
@Override
public void close() throws SQLException {
- if (null != statement) {
+ if (null != statement && !statement.isClosed()) {
Connection connection = statement.getConnection();
- connection.close();
statement.close();
+ connection.close();
}
}
@@ -106,18 +108,20 @@ public final class FederateJDBCExecutor implements FederateExecutor {
return statement.getResultSet();
}
- private ResultSet execute(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+ private ResultSet execute(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback,
+ final String type, final StatementOption statementOption) throws SQLException {
SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
- PreparedStatement statement = getConnection(executionContext, callback).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
+ PreparedStatement statement = getConnection(executionContext, callback, type, statementOption).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
setParameters(statement, sqlUnit.getParameters());
this.statement = statement;
return statement.executeQuery();
}
- private Connection getConnection(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+ private Connection getConnection(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback,
+ final String type, final StatementOption statementOption) throws SQLException {
Connection result = DriverManager.getConnection(CONNECTION_URL, getProperties());
CalciteConnection calciteConnection = result.unwrap(CalciteConnection.class);
- addSchema(calciteConnection, executionContext, callback);
+ addSchema(calciteConnection, executionContext, callback, type, statementOption);
return result;
}
@@ -128,8 +132,9 @@ public final class FederateJDBCExecutor implements FederateExecutor {
return result;
}
- private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
- FederateRowExecutor executor = new FederateRowExecutor(rules, props, jdbcManager, jdbcExecutor, executionContext, callback);
+ private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext,
+ final JDBCExecutorCallback<? extends ExecuteResult> callback, final String type, final StatementOption statementOption) throws SQLException {
+ FederateRowExecutor executor = new FederateRowExecutor(rules, props, jdbcManager, jdbcExecutor, executionContext, callback, type, statementOption);
FederateLogicSchema logicSchema = new FederateLogicSchema(factory.getSchemaMetadatas().getSchemaMetadataBySchemaName(schema), executor);
calciteConnection.getRootSchema().add(schema, logicSchema);
calciteConnection.setSchema(schema);
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
index 112f797..33721cd 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.optimize.ShardingSphereOptimizer;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
@@ -50,7 +51,8 @@ public final class FederateRawExecutor implements FederateExecutor {
}
@Override
- public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback) throws SQLException {
+ public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback,
+ final String type, final StatementOption statementOption) throws SQLException {
// TODO
return Collections.emptyList();
}
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
index ef40fdc..efd50ef 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.genera
import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.generator.FederateExecutionSQLGenerator;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -65,6 +64,10 @@ public final class FederateRowExecutor {
private final JDBCExecutorCallback<? extends ExecuteResult> callback;
+ private final String type;
+
+ private final StatementOption statementOption;
+
/**
* Execute.
*
@@ -94,12 +97,9 @@ public final class FederateRowExecutor {
}
private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
- // TODO Set parameters for StatementOption
int maxConnectionsSizePerQuery = props.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
Collection<ExecutionUnit> executionUnits = executionContext.getExecutionUnits();
- String type = executionUnits.stream().anyMatch(each -> !each.getSqlUnit().getParameters().isEmpty()) ? JDBCDriverType.PREPARED_STATEMENT : JDBCDriverType.STATEMENT;
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
- type, maxConnectionsSizePerQuery, jdbcManager, new StatementOption(true), rules);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(type, maxConnectionsSizePerQuery, jdbcManager, statementOption, rules);
return prepareEngine.prepare(executionContext.getRouteContext(), executionUnits);
}
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index bc334af..ad98836 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
-import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.constant.SQLExceptionConstant;
@@ -213,12 +212,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
if (executionContext.getExecutionUnits().isEmpty()) {
return Collections.emptyList();
}
- // TODO : Please fix me here.
- // PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
- // sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
- executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
- return federateExecutor.executeQuery(executionContext, callback);
+ PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+ sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
+ return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.PREPARED_STATEMENT, statementOption);
}
@Override
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index c5c489b..396ca52 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -185,7 +185,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
}
StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
- return federateExecutor.executeQuery(executionContext, callback);
+ return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.STATEMENT, statementOption);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index a71eb60..68fbf60 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -123,7 +123,7 @@ public final class ProxySQLExecutor {
return rawExecute(executionContext, rules, maxConnectionsSizePerQuery);
}
if (executionContext.getRouteContext().isFederated()) {
- return federateExecute(executionContext, rules, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+ return federateExecute(executionContext, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
}
return useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
}
@@ -140,8 +140,7 @@ public final class ProxySQLExecutor {
return rawExecutor.execute(executionGroupContext, executionContext.getSqlStatementContext(), new RawSQLExecutorCallback());
}
- private Collection<ExecuteResult> federateExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
- final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+ private Collection<ExecuteResult> federateExecute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
if (executionContext.getExecutionUnits().isEmpty()) {
return Collections.emptyList();
}
@@ -149,7 +148,7 @@ public final class ProxySQLExecutor {
ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
backendConnection.setFederateExecutor(federateExecutor);
- return federateExecutor.executeQuery(executionContext, callback).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+ return federateExecutor.executeQuery(executionContext, callback, type, new StatementOption(isReturnGeneratedKeys)).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
}
private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,