You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/11/19 05:17:43 UTC
[shardingsphere] branch master updated: Fix executing prepared TCL statement cause inconsistent transaction status (#13682)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 29d568b Fix executing prepared TCL statement cause inconsistent transaction status (#13682)
29d568b is described below
commit 29d568ba65b5cf4691d2fb8a799340585b5ec3a6
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Fri Nov 19 13:16:44 2021 +0800
Fix executing prepared TCL statement cause inconsistent transaction status (#13682)
* Fix MySQL Proxy doesn't support prepared TCL statement
* Fix checkstyle
* Complete test cases
---
.../execute/MySQLComStmtExecuteExecutor.java | 28 ++++++++++++++++++----
.../execute/MySQLComStmtExecuteExecutorTest.java | 16 +++++++++++++
2 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index ec60d28..a8c7cd5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -44,10 +44,13 @@ import org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryRes
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import java.sql.SQLException;
import java.util.Collection;
@@ -62,6 +65,8 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
private final DatabaseCommunicationEngine databaseCommunicationEngine;
+ private final TextProtocolBackendHandler textProtocolBackendHandler;
+
private final int characterSet;
@Getter
@@ -69,7 +74,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
private int currentSequenceId;
- public MySQLComStmtExecuteExecutor(final MySQLComStmtExecutePacket packet, final BackendConnection backendConnection) {
+ public MySQLComStmtExecuteExecutor(final MySQLComStmtExecutePacket packet, final BackendConnection backendConnection) throws SQLException {
String schemaName = backendConnection.getSchemaName();
MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(
@@ -82,8 +87,16 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
((TableAvailable) sqlStatementContext).getTablesContext().getSchemaName().ifPresent(SQLStatementSchemaHolder::set);
}
SQLCheckEngine.check(sqlStatement, Collections.emptyList(), getRules(schemaName), schemaName, metaDataContexts.getMetaDataMap(), backendConnection.getGrantee());
- databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatementContext, packet.getSql(), packet.getParameters(), backendConnection);
characterSet = backendConnection.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
+ // TODO Refactor the following branch
+ if (sqlStatement instanceof TCLStatement) {
+ databaseCommunicationEngine = null;
+ textProtocolBackendHandler =
+ TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("MySQL"), packet.getSql(), backendConnection);
+ return;
+ }
+ textProtocolBackendHandler = null;
+ databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatementContext, packet.getSql(), packet.getParameters(), backendConnection);
}
private static Collection<ShardingSphereRule> getRules(final String schemaName) {
@@ -95,7 +108,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
- ResponseHeader responseHeader = databaseCommunicationEngine.execute();
+ ResponseHeader responseHeader = null != databaseCommunicationEngine ? databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
return responseHeader instanceof QueryResponseHeader ? processQuery((QueryResponseHeader) responseHeader) : processUpdate((UpdateResponseHeader) responseHeader);
}
@@ -113,7 +126,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public boolean next() throws SQLException {
- return databaseCommunicationEngine.next();
+ return null != databaseCommunicationEngine && databaseCommunicationEngine.next();
}
@Override
@@ -129,6 +142,11 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public void close() throws SQLException {
- databaseCommunicationEngine.close();
+ if (null != databaseCommunicationEngine) {
+ databaseCommunicationEngine.close();
+ }
+ if (null != textProtocolBackendHandler) {
+ textProtocolBackendHandler.close();
+ }
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index 00311b5..edb19e6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -35,8 +35,10 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -112,4 +114,18 @@ public final class MySQLComStmtExecuteExecutorTest {
mysqlComStmtExecuteExecutor.execute();
assertThat(mysqlComStmtExecuteExecutor.getResponseType(), is(ResponseType.UPDATE));
}
+
+ @Test
+ public void assertExecutePreparedCommit() throws SQLException, NoSuchFieldException {
+ when(backendConnection.getSchemaName()).thenReturn("logic_db");
+ when(backendConnection.getDefaultSchemaName()).thenReturn("logic_db");
+ MySQLComStmtExecutePacket packet = mock(MySQLComStmtExecutePacket.class);
+ when(packet.getSql()).thenReturn("commit");
+ MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(packet, backendConnection);
+ TextProtocolBackendHandler textProtocolBackendHandler = mock(TextProtocolBackendHandler.class);
+ FieldSetter.setField(mysqlComStmtExecuteExecutor, MySQLComStmtExecuteExecutor.class.getDeclaredField("textProtocolBackendHandler"), textProtocolBackendHandler);
+ when(textProtocolBackendHandler.execute()).thenReturn(new UpdateResponseHeader(mock(CommitStatement.class)));
+ mysqlComStmtExecuteExecutor.execute();
+ assertThat(mysqlComStmtExecuteExecutor.getResponseType(), is(ResponseType.UPDATE));
+ }
}