You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/03/14 07:33:35 UTC

[shardingsphere] branch master updated: Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1006e33d5 Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)
4f1006e33d5 is described below

commit 4f1006e33d587dde6d5d0ab76210099015136cc0
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Tue Mar 14 15:33:20 2023 +0800

    Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control (#24587)
    
    * Refactor DatabaseConnector and ProxySQLExecutor for multi logic sql transaction control
    
    * add todo comment
---
 .../proxy/backend/connector/DatabaseConnector.java | 57 +++++++++++++++++++++-
 .../proxy/backend/connector/ProxySQLExecutor.java  | 33 -------------
 2 files changed, 56 insertions(+), 34 deletions(-)

diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 56a9f3aa1af..b581068c0b1 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.proxy.backend.connector;
 
 import com.google.common.base.Preconditions;
+import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.infra.binder.QueryContext;
 import org.apache.shardingsphere.infra.binder.aware.CursorDefinitionAware;
 import org.apache.shardingsphere.infra.binder.decider.context.SQLFederationDeciderContext;
@@ -55,6 +56,7 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
+import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.RuleNotExistedException;
@@ -68,11 +70,16 @@ import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader
 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutor;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
+import org.apache.shardingsphere.transaction.api.TransactionType;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -158,7 +165,6 @@ public final class DatabaseConnector implements DatabaseBackendHandler {
      * @return backend response
      * @throws SQLException SQL exception
      */
-    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public ResponseHeader execute() throws SQLException {
         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
@@ -169,8 +175,57 @@ public final class DatabaseConnector implements DatabaseBackendHandler {
             ResultSet resultSet = doExecuteFederation(queryContext, metaDataContexts);
             return processExecuteFederation(resultSet, metaDataContexts);
         }
+        Collection<ExecutionContext> executionContexts = generateExecutionContexts(metaDataContexts);
+        return isNeedImplicitCommitTransaction(executionContexts.iterator().next()) ? doExecuteWithImplicitCommitTransaction(executionContexts) : doExecute(executionContexts);
+    }
+    
+    private Collection<ExecutionContext> generateExecutionContexts(final MetaDataContexts metaDataContexts) {
+        Collection<ExecutionContext> result = new LinkedList<>();
         ExecutionContext executionContext = new KernelProcessor().generateExecutionContext(queryContext, database, metaDataContexts.getMetaData().getGlobalRuleMetaData(),
                 metaDataContexts.getMetaData().getProps(), backendConnection.getConnectionSession().getConnectionContext());
+        result.add(executionContext);
+        // TODO support logical SQL optimize to generate multiple logical SQL
+        return result;
+    }
+    
+    private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+        TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
+        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+        return TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) && !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
+                && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+    }
+    
+    private ResponseHeader doExecuteWithImplicitCommitTransaction(final Collection<ExecutionContext> executionContexts) throws SQLException {
+        ResponseHeader result;
+        BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
+        try {
+            transactionManager.begin();
+            result = doExecute(executionContexts);
+            transactionManager.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            transactionManager.rollback();
+            String databaseName = backendConnection.getConnectionSession().getDatabaseName();
+            throw SQLExceptionTransformEngine.toSQLException(ex, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
+                    .getDatabase(databaseName).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private ResponseHeader doExecute(final Collection<ExecutionContext> executionContexts) throws SQLException {
+        ResponseHeader result = null;
+        for (ExecutionContext each : executionContexts) {
+            ResponseHeader responseHeader = doExecute(each);
+            if (null == result) {
+                result = responseHeader;
+            }
+        }
+        return result;
+    }
+    
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private ResponseHeader doExecute(final ExecutionContext executionContext) throws SQLException {
         if (executionContext.getExecutionUnits().isEmpty()) {
             return new UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
         }
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
index 6d1d55e7509..fa9daaf14d2 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.proxy.backend.connector;
 
-import org.apache.shardingsphere.dialect.SQLExceptionTransformEngine;
 import org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
@@ -42,7 +41,6 @@ import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.proxy.backend.connector.jdbc.executor.ProxyJDBCExecutor;
 import org.apache.shardingsphere.proxy.backend.connector.jdbc.statement.JDBCBackendStatement;
-import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.connector.sane.SaneQueryResultEngine;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -53,8 +51,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatemen
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
@@ -145,35 +141,6 @@ public final class ProxySQLExecutor {
      * @throws SQLException SQL exception
      */
     public List<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
-        return isNeedImplicitCommitTransaction(executionContext) ? doExecuteWithImplicitCommitTransaction(executionContext) : doExecute(executionContext);
-    }
-    
-    private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
-        TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
-        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
-        return TransactionType.isDistributedTransaction(transactionStatus.getTransactionType()) && !transactionStatus.isInTransaction() && sqlStatement instanceof DMLStatement
-                && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
-    }
-    
-    private List<ExecuteResult> doExecuteWithImplicitCommitTransaction(final ExecutionContext executionContext) throws SQLException {
-        List<ExecuteResult> result;
-        BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
-        try {
-            transactionManager.begin();
-            result = doExecute(executionContext);
-            transactionManager.commit();
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            transactionManager.rollback();
-            String databaseName = backendConnection.getConnectionSession().getDatabaseName();
-            throw SQLExceptionTransformEngine.toSQLException(ex, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData()
-                    .getDatabase(databaseName).getProtocolType().getType());
-        }
-        return result;
-    }
-    
-    private List<ExecuteResult> doExecute(final ExecutionContext executionContext) throws SQLException {
         String databaseName = backendConnection.getConnectionSession().getDatabaseName();
         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName).getRuleMetaData().getRules();
         int maxConnectionsSizePerQuery = ProxyContext.getInstance()