You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/29 04:19:48 UTC

[shardingsphere] branch master updated: Fix openGauss cursor execution in xa transaction(#21250) (#21257)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a467bbe1668 Fix openGauss cursor execution in xa transaction(#21250) (#21257)
a467bbe1668 is described below

commit a467bbe1668c2c300d3c91236e9e213c565ab7bf
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Thu Sep 29 12:19:38 2022 +0800

    Fix openGauss cursor execution in xa transaction(#21250) (#21257)
    
    * Fix openGauss cursor execution in xa transaction
    
    * Fix
    
    * Fix method name
---
 .../backend/communication/ProxySQLExecutor.java    | 10 +++++--
 .../communication/ReactiveProxySQLExecutor.java    | 23 ++++++++++----
 .../communication/ProxySQLExecutorTest.java        | 35 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 8 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index fb1bc24b6da..ff69c09ef9a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -105,23 +105,27 @@ public final class ProxySQLExecutor {
     
     private boolean isExecuteDDLInXATransaction(final SQLStatement sqlStatement) {
         TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
-        return TransactionType.XA == transactionStatus.getTransactionType() && isUnsupportedDDLStatement(sqlStatement) && transactionStatus.isInTransaction();
+        return TransactionType.XA == transactionStatus.getTransactionType() && transactionStatus.isInTransaction() && isUnsupportedDDLStatement(sqlStatement);
     }
     
     private boolean isExecuteDDLInPostgreSQLOpenGaussTransaction(final SQLStatement sqlStatement) {
         // TODO implement DDL statement commit/rollback in PostgreSQL/openGauss transaction
         boolean isPostgreSQLOpenGaussStatement = isPostgreSQLOrOpenGaussStatement(sqlStatement);
-        boolean isSupportedStatement = isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
+        boolean isSupportedStatement = isSupportedSQLStatement(sqlStatement);
         return sqlStatement instanceof DDLStatement && !isSupportedStatement && isPostgreSQLOpenGaussStatement && backendConnection.getConnectionSession().getTransactionStatus().isInTransaction();
     }
     
+    private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
+        return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
+    }
+    
     private boolean isCursorStatement(final SQLStatement sqlStatement) {
         return sqlStatement instanceof OpenGaussCursorStatement
                 || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
     }
     
     private boolean isUnsupportedDDLStatement(final SQLStatement sqlStatement) {
-        if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && sqlStatement instanceof TruncateStatement) {
+        if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && isSupportedSQLStatement(sqlStatement)) {
             return false;
         }
         return sqlStatement instanceof DDLStatement;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
index 759609f4156..542fec4490a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ReactiveProxySQLExecutor.java
@@ -39,9 +39,13 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
 import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CloseStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.FetchStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.MoveStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.TruncateStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.OpenGaussStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.PostgreSQLStatement;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
@@ -84,23 +88,32 @@ public final class ReactiveProxySQLExecutor {
     
     private boolean isExecuteDDLInXATransaction(final SQLStatement sqlStatement) {
         TransactionStatus transactionStatus = backendConnection.getConnectionSession().getTransactionStatus();
-        return TransactionType.XA == transactionStatus.getTransactionType() && isUnsupportedDDLStatement(sqlStatement) && transactionStatus.isInTransaction();
+        return TransactionType.XA == transactionStatus.getTransactionType() && transactionStatus.isInTransaction() && isUnsupportedDDLStatement(sqlStatement);
     }
     
     private boolean isExecuteDDLInPostgreSQLOpenGaussTransaction(final SQLStatement sqlStatement) {
         // TODO implement DDL statement commit/rollback in PostgreSQL/openGauss transaction
-        boolean isPostgreSQLOpenGaussStatement = sqlStatement instanceof PostgreSQLStatement || sqlStatement instanceof OpenGaussStatement;
-        boolean isSupportedDDLStatement = sqlStatement instanceof TruncateStatement;
-        return sqlStatement instanceof DDLStatement && !isSupportedDDLStatement && isPostgreSQLOpenGaussStatement && backendConnection.getConnectionSession().getTransactionStatus().isInTransaction();
+        boolean isPostgreSQLOpenGaussStatement = isPostgreSQLOrOpenGaussStatement(sqlStatement);
+        boolean isSupportedStatement = isSupportedSQLStatement(sqlStatement);
+        return sqlStatement instanceof DDLStatement && !isSupportedStatement && isPostgreSQLOpenGaussStatement && backendConnection.getConnectionSession().getTransactionStatus().isInTransaction();
     }
     
     private boolean isUnsupportedDDLStatement(final SQLStatement sqlStatement) {
-        if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && sqlStatement instanceof TruncateStatement) {
+        if (isPostgreSQLOrOpenGaussStatement(sqlStatement) && isSupportedSQLStatement(sqlStatement)) {
             return false;
         }
         return sqlStatement instanceof DDLStatement;
     }
     
+    private boolean isSupportedSQLStatement(final SQLStatement sqlStatement) {
+        return isCursorStatement(sqlStatement) || sqlStatement instanceof TruncateStatement;
+    }
+    
+    private boolean isCursorStatement(final SQLStatement sqlStatement) {
+        return sqlStatement instanceof OpenGaussCursorStatement
+                || sqlStatement instanceof CloseStatement || sqlStatement instanceof MoveStatement || sqlStatement instanceof FetchStatement;
+    }
+    
     private boolean isPostgreSQLOrOpenGaussStatement(final SQLStatement sqlStatement) {
         return sqlStatement instanceof PostgreSQLStatement || sqlStatement instanceof OpenGaussStatement;
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutorTest.java
index 99684cfecc1..aab2ab9b4b6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutorTest.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.proxy.backend.communication;
 import org.apache.shardingsphere.infra.binder.QueryContext;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.ddl.CreateTableStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.ddl.CursorStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.ddl.TruncateStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.database.DefaultDatabase;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
@@ -39,12 +41,16 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.backend.util.ProxyContextRestorer;
+import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.item.ProjectionsSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.SimpleTableSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.table.TableNameSegment;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.ddl.MySQLCreateTableStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.ddl.MySQLTruncateStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.opengauss.ddl.OpenGaussCursorStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.ddl.PostgreSQLCreateTableStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.ddl.PostgreSQLTruncateStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.postgresql.dml.PostgreSQLInsertStatement;
@@ -142,6 +148,14 @@ public final class ProxySQLExecutorTest extends ProxyContextRestorer {
         new ProxySQLExecutor(JDBCDriverType.STATEMENT, backendConnection, mock(JDBCDatabaseCommunicationEngine.class)).checkExecutePrerequisites(executionContext);
     }
     
+    @Test
+    public void assertCheckExecutePrerequisitesWhenExecuteCursorInPostgreSQLTransaction() {
+        when(connectionSession.getTransactionStatus().getTransactionType()).thenReturn(TransactionType.LOCAL);
+        ExecutionContext executionContext = new ExecutionContext(
+                new QueryContext(createCursorStatementContext(), "", Collections.emptyList()), Collections.emptyList(), mock(RouteContext.class));
+        new ProxySQLExecutor(JDBCDriverType.STATEMENT, backendConnection, mock(JDBCDatabaseCommunicationEngine.class)).checkExecutePrerequisites(executionContext);
+    }
+    
     @Test
     public void assertCheckExecutePrerequisitesWhenExecuteDMLInPostgreSQLTransaction() {
         when(connectionSession.getTransactionStatus().getTransactionType()).thenReturn(TransactionType.LOCAL);
@@ -185,6 +199,27 @@ public final class ProxySQLExecutorTest extends ProxyContextRestorer {
         return new TruncateStatementContext(sqlStatement);
     }
     
+    private CursorStatementContext createCursorStatementContext() {
+        CursorStatementContext result = mock(CursorStatementContext.class, RETURNS_DEEP_STUBS);
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
+        SelectStatement selectStatement = createSelectStatement();
+        selectStatement.setProjections(new ProjectionsSegment(0, 0));
+        OpenGaussCursorStatement sqlStatement = new OpenGaussCursorStatement();
+        sqlStatement.setSelect(selectStatement);
+        SelectStatementContext selectStatementContext = new SelectStatementContext(Collections.singletonMap(DefaultDatabase.LOGIC_NAME, database), Collections.emptyList(),
+                selectStatement, DefaultDatabase.LOGIC_NAME);
+        when(result.getSelectStatementContext()).thenReturn(selectStatementContext);
+        when(result.getSqlStatement()).thenReturn(sqlStatement);
+        return result;
+    }
+    
+    private SelectStatement createSelectStatement() {
+        SelectStatement result = new MySQLSelectStatement();
+        result.setFrom(new SimpleTableSegment(new TableNameSegment(10, 13, new IdentifierValue("tbl"))));
+        result.setProjections(new ProjectionsSegment(0, 0));
+        return result;
+    }
+    
     private InsertStatementContext createMySQLInsertStatementContext() {
         MySQLInsertStatement sqlStatement = new MySQLInsertStatement();
         sqlStatement.setTable(new SimpleTableSegment(new TableNameSegment(0, 0, new IdentifierValue("t_order"))));