You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/03/14 07:33:35 UTC
[shardingsphere] branch master updated: Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1006e33d5 Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)
4f1006e33d5 is described below
commit 4f1006e33d587dde6d5d0ab76210099015136cc0
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Mar 14 15:33:20 2023 +0800
Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)
* Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control
* add todo comment
---
.../proxy/backend/connector/DatabaseConnector.java | 57 +++++++++++++++++++++-
.../proxy/backend/connector/ProxySQLExecutor.java | 33 -------------
2 files changed, 56 insertions(+), 34 deletions(-)
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 56a9f3aa1af..b581068c0b1 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.backend.connector;
import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.binder.QueryContext;
import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
import org.apache.shardingsphere.infra.binder.decider.context.SQLFederationDeciderContext;
@@ -55,6 +56,7 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
+import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
@@ -68,11 +70,16 @@ import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
import org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutor;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
+import org.apache.shardingsphere.transaction.api.TransactionType;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -158,7 +165,6 @@ public final class DatabaseConnector implements DatabaseBackendHandler {
* @return backend response
* @throws SQLException SQL exception
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ResponseHeader execute() throws SQLException {
MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
@@ -169,8 +175,57 @@ public final class DatabaseConnector implements DatabaseBackendHandler {
ResultSet resultSet = doExecuteFederation(queryContext, metaDataContexts);
return processExecuteFederation(resultSet, metaDataContexts);
}
+ Collection<ExecutionContext> executionContexts = generateExecutionContexts(metaDataContexts);
+ return isNeedImplicitCommitTransaction(executionContexts.iterator().next()) ? doExecuteWithImplicitCommitTransaction(executionContexts) : doExecute(executionContexts);
+ }
+
+ private Collection<ExecutionContext> generateExecutionContexts(final MetaDataContexts metaDataContexts) {
+ Collection<ExecutionContext> result = new LinkedList<>();
ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData(),
metaDataContexts.getMetaData().getProps(), backendConnection.getConnectionSession().getConnectionContext());
+ result.add(executionContext);
+ // TODO support logical SQL optimize to generate multiple logical SQL
+ return result;
+ }
+
+ private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+ TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
+ SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+ return TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) && !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
+ && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+ }
+
+ private ResponseHeader doExecuteWithImplicitCommitTransaction(final Collection<ExecutionContext> executionContexts) throws SQLException {
+ ResponseHeader result;
+ BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
+ try {
+ transactionManager.begin();
+ result = doExecute(executionContexts);
+ transactionManager.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ transactionManager.rollback();
+ String databaseName = backendConnection.getConnectionSession().getDatabaseName();
+ throw SQLExceptionTransformEngine.toSQLException(ex, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
+ .getDatabase(databaseName).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private ResponseHeader doExecute(final Collection<ExecutionContext> executionContexts) throws SQLException {
+ ResponseHeader result = null;
+ for (ExecutionContext each : executionContexts) {
+ ResponseHeader responseHeader = doExecute(each);
+ if (null == result) {
+ result = responseHeader;
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
if (executionContext.getExecutionUnits().isEmpty()) {
return new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
index 6d1d55e7509..fa9daaf14d2 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.proxy.backend.connector;
-import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -42,7 +41,6 @@ import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
-import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.connector.sane.SaneQueryResultEngine;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -53,8 +51,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
@@ -145,35 +141,6 @@ public final class ProxySQLExecutor {
* @throws SQLException SQL exception
*/
public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
- return isNeedImplicitCommitTransaction(executionContext) ? doExecuteWithImplicitCommitTransaction(executionContext) : doExecute(executionContext);
- }
-
- private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
- TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
- SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
- return TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) && !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
- && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
- }
-
- private List<ExecuteResult> doExecuteWithImplicitCommitTransaction(final ExecutionContext executionContext) throws SQLException {
- List<ExecuteResult> result;
- BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
- try {
- transactionManager.begin();
- result = doExecute(executionContext);
- transactionManager.commit();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- transactionManager.rollback();
- String databaseName = backendConnection.getConnectionSession().getDatabaseName();
- throw SQLExceptionTransformEngine.toSQLException(ex, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
- .getDatabase(databaseName).getProtocolType().getType());
- }
- return result;
- }
-
- private List<ExecuteResult> doExecute(final ExecutionContext executionContext) throws SQLException {
String databaseName = backendConnection.getConnectionSession().getDatabaseName();
Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
int maxConnectionsSizePerQuery = ProxyContext.getInstance()