You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/05/25 00:42:52 UTC
[shardingsphere] branch master updated: Fix autocommit (#17904)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4e0f80f1eca Fix autocommit (#17904)
4e0f80f1eca is described below
commit 4e0f80f1eca2651323b8dd86725425094a4299a7
Author: JingShang Lu <ji...@gmail.com>
AuthorDate: Wed May 25 08:42:45 2022 +0800
Fix autocommit (#17904)
* fix issue-16894
* fix
* fix
* Create pom.xml
---
.../proxy/backend/communication/BackendConnection.java | 6 ++++--
.../communication/jdbc/connection/JDBCBackendConnection.java | 8 ++------
.../proxy/backend/text/TextProtocolBackendHandlerFactory.java | 7 +++++++
.../communication/jdbc/connection/JDBCBackendConnectionTest.java | 2 +-
.../proxy/frontend/command/CommandExecutorTask.java | 1 -
.../proxy/frontend/command/CommandExecutorTaskTest.java | 7 +++++--
.../frontend/reactive/command/ReactiveCommandExecuteTaskTest.java | 7 ++++---
7 files changed, 23 insertions(+), 15 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
index 3d853921a30..94c18688fe1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.proxy.backend.communication;
import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import java.sql.SQLException;
+
/**
* Backend connection for Proxy.
*
@@ -38,9 +40,9 @@ public interface BackendConnection<T> {
* Prepare for task execution.
*
* @return can be Void or Future
- * @throws BackendConnectionException backend connection exception
+ * @throws SQLException SQL exception
*/
- T prepareForTaskExecution() throws BackendConnectionException;
+ T prepareForTaskExecution() throws SQLException;
/**
* Close resources used in execution.
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
index c59d9ac80b2..be802510913 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java
@@ -175,16 +175,12 @@ public final class JDBCBackendConnection implements BackendConnection<Void>, Exe
}
@Override
- public Void prepareForTaskExecution() throws BackendConnectionException {
+ public Void prepareForTaskExecution() throws SQLException {
synchronized (this) {
connectionReferenceCount++;
if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
JDBCBackendTransactionManager transactionManager = new JDBCBackendTransactionManager(this);
- try {
- transactionManager.begin();
- } catch (SQLException ex) {
- throw new BackendConnectionException(ex);
- }
+ transactionManager.begin();
}
return null;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
index 83366e2aa1f..5e8e18ead6c 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java
@@ -101,6 +101,7 @@ public final class TextProtocolBackendHandlerFactory {
}
return DistSQLBackendHandlerFactory.newInstance(databaseType, (DistSQLStatement) sqlStatement, connectionSession);
}
+ handleAutoCommit(sqlStatement, connectionSession);
SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMap(),
sqlStatement, connectionSession.getDefaultDatabaseName());
Optional<TextProtocolBackendHandler> backendHandler = DatabaseAdminBackendHandlerFactory.newInstance(databaseType, sqlStatementContext, connectionSession, sql);
@@ -138,6 +139,12 @@ public final class TextProtocolBackendHandlerFactory {
: ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMetaData(databaseName).getProtocolType();
}
+ private static void handleAutoCommit(final SQLStatement sqlStatement, final ConnectionSession connectionSession) throws SQLException {
+ if (!(sqlStatement instanceof TCLStatement)) {
+ connectionSession.getBackendConnection().prepareForTaskExecution();
+ }
+ }
+
private static Optional<ExtraTextProtocolBackendHandler> findExtraTextProtocolBackendHandler(final SQLStatement sqlStatement) {
for (ExtraTextProtocolBackendHandler each : ShardingSphereServiceLoader.getServiceInstances(ExtraTextProtocolBackendHandler.class)) {
if (each.accept(sqlStatement)) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
index 5c52358ff4b..b9e10676a81 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java
@@ -382,7 +382,7 @@ public final class JDBCBackendConnectionTest extends ProxyContextRestorer {
}
@Test
- public void assertPrepareForTaskExecution() throws BackendConnectionException {
+ public void assertPrepareForTaskExecution() throws SQLException {
backendConnection.prepareForTaskExecution();
verify(backendConnection).closeDatabaseCommunicationEngines(true);
verify(backendConnection).closeConnections(false);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 5d415b64590..bcba994d6f4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -73,7 +73,6 @@ public final class CommandExecutorTask implements Runnable {
if (sqlShowEnabled) {
fillLogMDC();
}
- connectionSession.getBackendConnection().prepareForTaskExecution();
isNeedFlush = executeCommand(context, payload);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 37d0faf84a0..41c28c7b44e 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -152,10 +152,13 @@ public final class CommandExecutorTaskTest extends ProxyContextRestorer {
@SuppressWarnings("unchecked")
@Test
- public void assertRunWithError() throws BackendConnectionException {
+ public void assertRunWithError() throws BackendConnectionException, SQLException {
RuntimeException mockException = new RuntimeException("mock");
- doThrow(mockException).when(backendConnection).prepareForTaskExecution();
+ doThrow(mockException).when(commandExecutor).execute();
when(engine.getCodecEngine().createPacketPayload(message, StandardCharsets.UTF_8)).thenReturn(payload);
+ when(engine.getCommandExecuteEngine().getCommandPacket(payload, commandPacketType, connectionSession)).thenReturn(commandPacket);
+ when(engine.getCommandExecuteEngine().getCommandPacketType(payload)).thenReturn(commandPacketType);
+ when(engine.getCommandExecuteEngine().getCommandExecutor(commandPacketType, commandPacket, connectionSession)).thenReturn(commandExecutor);
when(engine.getCommandExecuteEngine().getErrorPacket(mockException)).thenReturn(databasePacket);
when(engine.getCommandExecuteEngine().getOtherPacket(connectionSession)).thenReturn(Optional.of(databasePacket));
CommandExecutorTask actual = new CommandExecutorTask(engine, connectionSession, handlerContext, message);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java
index 6e077b7d384..01901789c20 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java
@@ -34,6 +34,7 @@ import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
@@ -67,7 +68,7 @@ public final class ReactiveCommandExecuteTaskTest {
private ReactiveCommandExecutor reactiveCommandExecutor;
@Before
- public void setup() throws BackendConnectionException {
+ public void setup() throws BackendConnectionException, SQLException {
reactiveCommandExecuteTask = new ReactiveCommandExecuteTask(frontendEngine, connectionSession, channelHandlerContext, message);
when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenReturn(Future.succeededFuture());
when(connectionSession.getBackendConnection().closeExecutionResources()).thenReturn(Future.succeededFuture());
@@ -110,8 +111,8 @@ public final class ReactiveCommandExecuteTaskTest {
}
@Test
- public void assertExecuteAndExceptionOccur() throws BackendConnectionException {
- BackendConnectionException ex = new BackendConnectionException(Collections.emptyList());
+ public void assertExecuteAndExceptionOccur() throws SQLException {
+ SQLException ex = new SQLException("");
when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenThrow(ex);
DatabasePacket errorPacket = mock(DatabasePacket.class);
when(frontendEngine.getCommandExecuteEngine().getErrorPacket(ex)).thenReturn(errorPacket);