You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/15 07:52:48 UTC
[shardingsphere] branch master updated: Refactor transaction
process in proxy frontend (#7452)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 258a11b Refactor transaction process in proxy frontend (#7452)
258a11b is described below
commit 258a11bbfd1bc46de596b785cc59149026593f16
Author: Liang Zhang <te...@163.com>
AuthorDate: Tue Sep 15 15:52:23 2020 +0800
Refactor transaction process in proxy frontend (#7452)
* Rename BackendConnection.releaseAllResources
* Rename BackendConnection.closeAllResources
* Refactor FrontendChannelInboundHandler
* Refactor CommandExecutorTask
* Remove BackendConnection.close
* Fix test case
* Use dynamic transaction status in CommandExecutorTask
---
.../jdbc/connection/BackendConnection.java | 64 ++++--------
.../transaction/BackendTransactionManager.java | 2 +-
.../jdbc/transaction/TransactionStatus.java | 9 ++
.../jdbc/connection/BackendConnectionTest.java | 113 +++------------------
.../trnasaction/BackendTransactionManagerTest.java | 4 +-
.../frontend/command/CommandExecutorTask.java | 53 ++++++++--
.../netty/FrontendChannelInboundHandler.java | 15 ++-
.../frontend/command/CommandExecutorTaskTest.java | 15 ++-
8 files changed, 114 insertions(+), 161 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 8edc734..11260c2 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.connection.JD
import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.group.StatementOption;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
-import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -53,7 +52,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
* Backend connection.
*/
@Getter
-public final class BackendConnection implements JDBCExecutionConnection, AutoCloseable {
+public final class BackendConnection implements JDBCExecutionConnection {
static {
ShardingSphereServiceLoader.register(StatementMemoryStrictlyFetchSizeSetter.class);
@@ -146,6 +145,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
return result;
}
+ private void replayMethodsInvocation(final Object target) {
+ for (MethodInvocation each : methodInvocations) {
+ each.invoke(target);
+ }
+ }
+
@Override
public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
Statement result = connection.createStatement();
@@ -215,30 +220,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
cachedResultSets.add(resultSet);
}
- @Override
- public void close() throws SQLException {
- close(false);
- }
-
/**
- * Close cached connection.
+ * Close result sets.
*
- * @param forceClose force close flag
- * @throws SQLException SQL exception
+ * @return SQL exception when result sets close
*/
- public synchronized void close(final boolean forceClose) throws SQLException {
- Collection<SQLException> exceptions = new LinkedList<>();
- MasterVisitedManager.clear();
- exceptions.addAll(closeResultSets());
- exceptions.addAll(closeStatements());
- if (!transactionStatus.isInTransaction() || forceClose || TransactionType.BASE == transactionStatus.getTransactionType()) {
- exceptions.addAll(releaseConnections(forceClose));
- }
- connectionStatus.switchToReleased();
- throwSQLExceptionIfNecessary(exceptions);
- }
-
- private Collection<SQLException> closeResultSets() {
+ public synchronized Collection<SQLException> closeResultSets() {
Collection<SQLException> result = new LinkedList<>();
for (ResultSet each : cachedResultSets) {
try {
@@ -251,7 +238,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
return result;
}
- private Collection<SQLException> closeStatements() {
+ /**
+ * Close statements.
+ *
+ * @return SQL exception when statements close
+ */
+ public synchronized Collection<SQLException> closeStatements() {
Collection<SQLException> result = new LinkedList<>();
for (Statement each : cachedStatements) {
try {
@@ -265,12 +257,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
}
/**
- * Release connections.
+ * Close connections.
*
* @param forceRollback is force rollback
- * @return SQL exception when connections release
+ * @return SQL exception when connections close
*/
- public Collection<SQLException> releaseConnections(final boolean forceRollback) {
+ public synchronized Collection<SQLException> closeConnections(final boolean forceRollback) {
Collection<SQLException> result = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
try {
@@ -284,23 +276,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
}
cachedConnections.clear();
methodInvocations.clear();
+ connectionStatus.switchToReleased();
return result;
}
-
- private void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
- if (exceptions.isEmpty()) {
- return;
- }
- SQLException ex = new SQLException("");
- for (SQLException each : exceptions) {
- ex.setNextException(each);
- }
- throw ex;
- }
-
- private void replayMethodsInvocation(final Object target) {
- for (MethodInvocation each : methodInvocations) {
- each.invoke(target);
- }
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
index 82a6caa..64db95d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
@@ -50,7 +50,7 @@ public final class BackendTransactionManager implements TransactionManager {
public void begin() {
if (!connection.getTransactionStatus().isInTransaction()) {
connection.getTransactionStatus().setInTransaction(true);
- connection.releaseConnections(false);
+ connection.closeConnections(false);
}
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.begin();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
index c84dd3f..331bdcd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
@@ -54,4 +54,13 @@ public final class TransactionStatus {
}
this.transactionType = transactionType;
}
+
+ /**
+ * Judge whether in connection held transaction.
+ *
+ * @return is in connection held transaction or not
+ */
+ public boolean isInConnectionHeldTransaction() {
+ return inTransaction && TransactionType.BASE != transactionType;
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index 6a80bc3..10baac5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -27,9 +27,9 @@ import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.BackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -42,9 +42,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
@@ -52,16 +50,13 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -78,15 +73,23 @@ public final class BackendConnectionTest {
private final BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL);
@Before
- public void setUp() {
+ public void setUp() throws ReflectiveOperationException {
setSchemaContexts();
setTransactionContexts();
setBackendDataSource();
backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
}
- @SneakyThrows(ReflectiveOperationException.class)
- private void setSchemaContexts() {
+ @After
+ public void clean() throws ReflectiveOperationException {
+ Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
+ field.setAccessible(true);
+ Class<?> clazz = field.getType();
+ Object datasource = clazz.getDeclaredConstructors()[0].newInstance();
+ field.set(ProxyContext.getInstance(), datasource);
+ }
+
+ private void setSchemaContexts() throws ReflectiveOperationException {
Field field = ProxyContext.getInstance().getClass().getDeclaredField("schemaContexts");
field.setAccessible(true);
field.set(ProxyContext.getInstance(), new StandardSchemaContexts(createSchemaContextMap(), new Authentication(), new ConfigurationProperties(new Properties()), new MySQLDatabaseType()));
@@ -103,8 +106,7 @@ public final class BackendConnectionTest {
return result;
}
- @SneakyThrows(ReflectiveOperationException.class)
- private void setTransactionContexts() {
+ private void setTransactionContexts() throws ReflectiveOperationException {
Field field = ProxyContext.getInstance().getClass().getDeclaredField("transactionContexts");
field.setAccessible(true);
field.set(ProxyContext.getInstance(), createTransactionContexts());
@@ -119,8 +121,7 @@ public final class BackendConnectionTest {
return result;
}
- @SneakyThrows(ReflectiveOperationException.class)
- private void setBackendDataSource() {
+ private void setBackendDataSource() throws ReflectiveOperationException {
Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
field.setAccessible(true);
field.set(ProxyContext.getInstance(), backendDataSource);
@@ -179,7 +180,7 @@ public final class BackendConnectionTest {
}
@Test
- public void assertMultiThreadGetConnection() throws SQLException, InterruptedException {
+ public void assertMultiThreadsGetConnection() throws SQLException, InterruptedException {
MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
Thread thread1 = new Thread(this::assertOneThreadResult);
@@ -199,80 +200,6 @@ public final class BackendConnectionTest {
assertTrue(backendConnection.getTransactionStatus().isInTransaction());
}
- @Test
- public void assertAutoCloseConnectionWithoutTransaction() throws SQLException {
- BackendConnection actual;
- try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
- backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
- when(backendDataSource.getConnections(anyString(), anyString(), eq(12), any())).thenReturn(MockConnectionUtil.mockNewConnections(12));
- backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
- backendConnection.getConnectionStatus().switchToUsing();
- mockResultSetAndStatement(backendConnection);
- actual = backendConnection;
- }
- assertThat(actual.getConnectionSize(), is(0));
- assertTrue(actual.getCachedConnections().isEmpty());
- assertTrue(actual.getCachedResultSets().isEmpty());
- assertTrue(actual.getCachedStatements().isEmpty());
- }
-
- @Test
- public void assertAutoCloseConnectionWithTransaction() throws SQLException {
- BackendConnection actual;
- try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
- backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
- MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
- when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
- backendConnection.getTransactionStatus().setInTransaction(true);
- backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
- mockResultSetAndStatement(backendConnection);
- actual = backendConnection;
- }
- assertThat(actual.getConnectionSize(), is(12));
- assertThat(actual.getCachedConnections().get("ds1").size(), is(12));
- assertTrue(actual.getCachedResultSets().isEmpty());
- assertTrue(actual.getCachedStatements().isEmpty());
- }
-
- @Test
- public void assertAutoCloseConnectionWithException() {
- BackendConnection actual = null;
- try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
- backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
- backendConnection.getTransactionStatus().setTransactionType(TransactionType.XA);
- backendConnection.getTransactionStatus().setInTransaction(true);
- MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
- backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
- backendConnection.getTransactionStatus().setInTransaction(false);
- mockResultSetAndStatement(backendConnection);
- mockResultSetAndStatementException(backendConnection);
- actual = backendConnection;
- } catch (final SQLException ex) {
- assertThat(ex.getNextException().getNextException(), instanceOf(SQLException.class));
- }
- assertNotNull(actual);
- assertThat(actual.getConnectionSize(), is(0));
- assertTrue(actual.getCachedConnections().isEmpty());
- assertTrue(actual.getCachedResultSets().isEmpty());
- assertTrue(actual.getCachedStatements().isEmpty());
- }
-
- private void mockResultSetAndStatement(final BackendConnection backendConnection) {
- ResultSet resultSet = mock(ResultSet.class);
- Statement statement = mock(Statement.class);
- backendConnection.add(resultSet);
- backendConnection.add(statement);
- }
-
- private void mockResultSetAndStatementException(final BackendConnection backendConnection) throws SQLException {
- for (Statement each : backendConnection.getCachedStatements()) {
- doThrow(SQLException.class).when(each).close();
- }
- for (ResultSet each : backendConnection.getCachedResultSets()) {
- doThrow(SQLException.class).when(each).close();
- }
- }
-
@Test(expected = ShardingSphereException.class)
public void assertFailedSwitchTransactionTypeWhileBegin() {
BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
@@ -286,14 +213,4 @@ public final class BackendConnectionTest {
transactionManager.begin();
backendConnection.setCurrentSchema("newSchema");
}
-
- @SneakyThrows(ReflectiveOperationException.class)
- @After
- public void clean() {
- Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
- field.setAccessible(true);
- Class<?> clazz = field.getType();
- Object datasource = clazz.getDeclaredConstructors()[0].newInstance();
- field.set(ProxyContext.getInstance(), datasource);
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
index 25734cc..83d8158 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
@@ -86,7 +86,7 @@ public final class BackendTransactionManagerTest {
newBackendTransactionManager(TransactionType.LOCAL, false);
backendTransactionManager.begin();
verify(transactionStatus).setInTransaction(true);
- verify(backendConnection).releaseConnections(false);
+ verify(backendConnection).closeConnections(false);
verify(localTransactionManager).begin();
}
@@ -95,7 +95,7 @@ public final class BackendTransactionManagerTest {
newBackendTransactionManager(TransactionType.XA, true);
backendTransactionManager.begin();
verify(transactionStatus, times(0)).setInTransaction(true);
- verify(backendConnection, times(0)).releaseConnections(false);
+ verify(backendConnection, times(0)).closeConnections(false);
verify(shardingTransactionManager).begin();
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 58b625d..43f3abb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.hook.RootInvokeHook;
import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
+import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatus;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
import java.sql.SQLException;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.Optional;
/**
@@ -63,28 +65,29 @@ public final class CommandExecutorTask implements Runnable {
public void run() {
RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
rootInvokeHook.start();
- int connectionSize = 0;
boolean isNeedFlush = false;
- try (BackendConnection backendConnection = this.backendConnection;
- PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
+ int connectionSize = 0;
+ try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
ConnectionStatus connectionStatus = backendConnection.getConnectionStatus();
- connectionStatus.waitUntilConnectionRelease();
- connectionStatus.switchToUsing();
+ if (!backendConnection.getTransactionStatus().isInConnectionHeldTransaction()) {
+ connectionStatus.waitUntilConnectionRelease();
+ connectionStatus.switchToUsing();
+ }
isNeedFlush = executeCommand(context, payload, backendConnection);
connectionSize = backendConnection.getConnectionSize();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
- Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
- databasePacket.ifPresent(context::writeAndFlush);
- if (!ExpectedExceptions.isExpected(ex.getClass())) {
- log.error("Exception occur: ", ex);
- }
+ processException(ex);
} finally {
if (isNeedFlush) {
context.flush();
}
+ Collection<SQLException> exceptions = closeExecutionResources();
+ if (!backendConnection.getTransactionStatus().isInConnectionHeldTransaction()) {
+ exceptions.addAll(backendConnection.closeConnections(false));
+ }
+ processClosedExceptions(exceptions);
rootInvokeHook.finish(connectionSize);
}
}
@@ -105,4 +108,32 @@ public final class CommandExecutorTask implements Runnable {
}
return databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
}
+
+ private void processException(final Exception cause) {
+ context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(cause));
+ Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
+ databasePacket.ifPresent(context::writeAndFlush);
+ if (!ExpectedExceptions.isExpected(cause.getClass())) {
+ log.error("Exception occur: ", cause);
+ }
+ }
+
+ private Collection<SQLException> closeExecutionResources() {
+ Collection<SQLException> result = new LinkedList<>();
+ MasterVisitedManager.clear();
+ result.addAll(backendConnection.closeResultSets());
+ result.addAll(backendConnection.closeStatements());
+ return result;
+ }
+
+ private void processClosedExceptions(final Collection<SQLException> exceptions) {
+ if (exceptions.isEmpty()) {
+ return;
+ }
+ SQLException ex = new SQLException("");
+ for (SQLException each : exceptions) {
+ ex.setNextException(each);
+ }
+ processException(ex);
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index f0b6df0..414a262 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -23,16 +23,16 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.apache.shardingsphere.proxy.frontend.auth.AuthenticationResult;
+import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.apache.shardingsphere.proxy.frontend.executor.ChannelThreadExecutorGroup;
import org.apache.shardingsphere.proxy.frontend.executor.CommandExecutorSelector;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.transaction.core.TransactionType;
-import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
/**
@@ -90,10 +90,17 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
}
@Override
- public void channelInactive(final ChannelHandlerContext context) throws SQLException {
+ public void channelInactive(final ChannelHandlerContext context) {
context.fireChannelInactive();
+ closeAllResources(context);
+ }
+
+ private void closeAllResources(final ChannelHandlerContext context) {
databaseProtocolFrontendEngine.release(backendConnection);
- backendConnection.close(true);
+ MasterVisitedManager.clear();
+ backendConnection.closeResultSets();
+ backendConnection.closeStatements();
+ backendConnection.closeConnections(true);
ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index b708ef5..81f2611 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -56,7 +57,7 @@ public final class CommandExecutorTaskTest {
@Mock
private PacketPayload payload;
- @Mock
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
private BackendConnection backendConnection;
@Mock
@@ -100,6 +101,9 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
+ when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -117,6 +121,9 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
+ when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -139,6 +146,9 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
+ when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -156,6 +166,9 @@ public final class CommandExecutorTaskTest {
when(executeEngine.getErrorPacket(eq(mockException))).thenReturn(databasePacket);
when(executeEngine.getOtherPacket()).thenReturn(Optional.of(databasePacket));
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
+ when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+ when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);