You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/09/15 07:52:48 UTC

[shardingsphere] branch master updated: Refactor transaction process in proxy frontend (#7452)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 258a11b  Refactor transaction process in proxy frontend (#7452)
258a11b is described below

commit 258a11bbfd1bc46de596b785cc59149026593f16
Author: Liang Zhang <te...@163.com>
AuthorDate: Tue Sep 15 15:52:23 2020 +0800

    Refactor transaction process in proxy frontend (#7452)
    
    * Rename BackendConnection.releaseAllResources
    
    * Rename BackendConnection.closeAllResources
    
    * Refactor FrontendChannelInboundHandler
    
    * Refactor CommandExecutorTask
    
    * Remove BackendConnection.close
    
    * Fix test case
    
    * Use dynamic transaction status in CommandExecutorTask
---
 .../jdbc/connection/BackendConnection.java         |  64 ++++--------
 .../transaction/BackendTransactionManager.java     |   2 +-
 .../jdbc/transaction/TransactionStatus.java        |   9 ++
 .../jdbc/connection/BackendConnectionTest.java     | 113 +++------------------
 .../trnasaction/BackendTransactionManagerTest.java |   4 +-
 .../frontend/command/CommandExecutorTask.java      |  53 ++++++++--
 .../netty/FrontendChannelInboundHandler.java       |  15 ++-
 .../frontend/command/CommandExecutorTaskTest.java  |  15 ++-
 8 files changed, 114 insertions(+), 161 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 8edc734..11260c2 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -30,7 +30,6 @@ import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.connection.JD
 import org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.group.StatementOption;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.type.TypedSPIRegistry;
-import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -53,7 +52,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
  * Backend connection.
  */
 @Getter
-public final class BackendConnection implements JDBCExecutionConnection, AutoCloseable {
+public final class BackendConnection implements JDBCExecutionConnection {
     
     static {
         ShardingSphereServiceLoader.register(StatementMemoryStrictlyFetchSizeSetter.class);
@@ -146,6 +145,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
         return result;
     }
     
+    private void replayMethodsInvocation(final Object target) {
+        for (MethodInvocation each : methodInvocations) {
+            each.invoke(target);
+        }
+    }
+    
     @Override
     public Statement createStorageResource(final Connection connection, final ConnectionMode connectionMode, final StatementOption option) throws SQLException {
         Statement result = connection.createStatement();
@@ -215,30 +220,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
         cachedResultSets.add(resultSet);
     }
     
-    @Override
-    public void close() throws SQLException {
-        close(false);
-    }
-    
     /**
-     * Close cached connection.
+     * Close result sets.
      *
-     * @param forceClose force close flag
-     * @throws SQLException SQL exception
+     * @return SQL exception when result sets close
      */
-    public synchronized void close(final boolean forceClose) throws SQLException {
-        Collection<SQLException> exceptions = new LinkedList<>();
-        MasterVisitedManager.clear();
-        exceptions.addAll(closeResultSets());
-        exceptions.addAll(closeStatements());
-        if (!transactionStatus.isInTransaction() || forceClose || TransactionType.BASE == transactionStatus.getTransactionType()) {
-            exceptions.addAll(releaseConnections(forceClose));
-        }
-        connectionStatus.switchToReleased();
-        throwSQLExceptionIfNecessary(exceptions);
-    }
-    
-    private Collection<SQLException> closeResultSets() {
+    public synchronized Collection<SQLException> closeResultSets() {
         Collection<SQLException> result = new LinkedList<>();
         for (ResultSet each : cachedResultSets) {
             try {
@@ -251,7 +238,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
         return result;
     }
     
-    private Collection<SQLException> closeStatements() {
+    /**
+     * Close statements.
+     *
+     * @return SQL exception when statements close
+     */
+    public synchronized Collection<SQLException> closeStatements() {
         Collection<SQLException> result = new LinkedList<>();
         for (Statement each : cachedStatements) {
             try {
@@ -265,12 +257,12 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
     }
     
     /**
-     * Release connections.
+     * Close connections.
      * 
      * @param forceRollback is force rollback
-     * @return SQL exception when connections release
+     * @return SQL exception when connections close
      */
-    public Collection<SQLException> releaseConnections(final boolean forceRollback) {
+    public synchronized Collection<SQLException> closeConnections(final boolean forceRollback) {
         Collection<SQLException> result = new LinkedList<>();
         for (Connection each : cachedConnections.values()) {
             try {
@@ -284,23 +276,7 @@ public final class BackendConnection implements JDBCExecutionConnection, AutoClo
         }
         cachedConnections.clear();
         methodInvocations.clear();
+        connectionStatus.switchToReleased();
         return result;
     }
-    
-    private void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
-        if (exceptions.isEmpty()) {
-            return;
-        }
-        SQLException ex = new SQLException("");
-        for (SQLException each : exceptions) {
-            ex.setNextException(each);
-        }
-        throw ex;
-    }
-    
-    private void replayMethodsInvocation(final Object target) {
-        for (MethodInvocation each : methodInvocations) {
-            each.invoke(target);
-        }
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
index 82a6caa..64db95d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/BackendTransactionManager.java
@@ -50,7 +50,7 @@ public final class BackendTransactionManager implements TransactionManager {
     public void begin() {
         if (!connection.getTransactionStatus().isInTransaction()) {
             connection.getTransactionStatus().setInTransaction(true);
-            connection.releaseConnections(false);
+            connection.closeConnections(false);
         }
         if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
             localTransactionManager.begin();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
index c84dd3f..331bdcd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/transaction/TransactionStatus.java
@@ -54,4 +54,13 @@ public final class TransactionStatus {
         }
         this.transactionType = transactionType;
     }
+    
+    /**
+     * Judge whether in connection held transaction.
+     * 
+     * @return is in connection held transaction or not
+     */
+    public boolean isInConnectionHeldTransaction() {
+        return inTransaction && TransactionType.BASE != transactionType;
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index 6a80bc3..10baac5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -27,9 +27,9 @@ import org.apache.shardingsphere.infra.context.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.sql.ConnectionMode;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.BackendTransactionManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.datasource.JDBCBackendDataSource;
 import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.core.TransactionType;
@@ -42,9 +42,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -52,16 +50,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -78,15 +73,23 @@ public final class BackendConnectionTest {
     private final BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL);
     
     @Before
-    public void setUp() {
+    public void setUp() throws ReflectiveOperationException {
         setSchemaContexts();
         setTransactionContexts();
         setBackendDataSource();
         backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
     }
     
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void setSchemaContexts() {
+    @After
+    public void clean() throws ReflectiveOperationException {
+        Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
+        field.setAccessible(true);
+        Class<?> clazz = field.getType();
+        Object datasource = clazz.getDeclaredConstructors()[0].newInstance();
+        field.set(ProxyContext.getInstance(), datasource);
+    }
+    
+    private void setSchemaContexts() throws ReflectiveOperationException {
         Field field = ProxyContext.getInstance().getClass().getDeclaredField("schemaContexts");
         field.setAccessible(true);
         field.set(ProxyContext.getInstance(), new StandardSchemaContexts(createSchemaContextMap(), new Authentication(), new ConfigurationProperties(new Properties()), new MySQLDatabaseType()));
@@ -103,8 +106,7 @@ public final class BackendConnectionTest {
         return result;
     }
     
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void setTransactionContexts() {
+    private void setTransactionContexts() throws ReflectiveOperationException {
         Field field = ProxyContext.getInstance().getClass().getDeclaredField("transactionContexts");
         field.setAccessible(true);
         field.set(ProxyContext.getInstance(), createTransactionContexts());
@@ -119,8 +121,7 @@ public final class BackendConnectionTest {
         return result;
     }
     
-    @SneakyThrows(ReflectiveOperationException.class)
-    private void setBackendDataSource() {
+    private void setBackendDataSource() throws ReflectiveOperationException {
         Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
         field.setAccessible(true);
         field.set(ProxyContext.getInstance(), backendDataSource);
@@ -179,7 +180,7 @@ public final class BackendConnectionTest {
     }
     
     @Test
-    public void assertMultiThreadGetConnection() throws SQLException, InterruptedException {
+    public void assertMultiThreadsGetConnection() throws SQLException, InterruptedException {
         MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
         when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
         Thread thread1 = new Thread(this::assertOneThreadResult);
@@ -199,80 +200,6 @@ public final class BackendConnectionTest {
         assertTrue(backendConnection.getTransactionStatus().isInTransaction());
     }
     
-    @Test
-    public void assertAutoCloseConnectionWithoutTransaction() throws SQLException {
-        BackendConnection actual;
-        try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
-            backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
-            when(backendDataSource.getConnections(anyString(), anyString(), eq(12), any())).thenReturn(MockConnectionUtil.mockNewConnections(12));
-            backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
-            backendConnection.getConnectionStatus().switchToUsing();
-            mockResultSetAndStatement(backendConnection);
-            actual = backendConnection;
-        }
-        assertThat(actual.getConnectionSize(), is(0));
-        assertTrue(actual.getCachedConnections().isEmpty());
-        assertTrue(actual.getCachedResultSets().isEmpty());
-        assertTrue(actual.getCachedStatements().isEmpty());
-    }
-    
-    @Test
-    public void assertAutoCloseConnectionWithTransaction() throws SQLException {
-        BackendConnection actual;
-        try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
-            backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
-            MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
-            when(backendDataSource.getConnections(anyString(), anyString(), eq(2), any())).thenReturn(MockConnectionUtil.mockNewConnections(2));
-            backendConnection.getTransactionStatus().setInTransaction(true);
-            backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
-            mockResultSetAndStatement(backendConnection);
-            actual = backendConnection;
-        }
-        assertThat(actual.getConnectionSize(), is(12));
-        assertThat(actual.getCachedConnections().get("ds1").size(), is(12));
-        assertTrue(actual.getCachedResultSets().isEmpty());
-        assertTrue(actual.getCachedStatements().isEmpty());
-    }
-    
-    @Test
-    public void assertAutoCloseConnectionWithException() {
-        BackendConnection actual = null;
-        try (BackendConnection backendConnection = new BackendConnection(TransactionType.LOCAL)) {
-            backendConnection.setCurrentSchema(String.format(SCHEMA_PATTERN, 0));
-            backendConnection.getTransactionStatus().setTransactionType(TransactionType.XA);
-            backendConnection.getTransactionStatus().setInTransaction(true);
-            MockConnectionUtil.setCachedConnections(backendConnection, "ds1", 10);
-            backendConnection.getConnections("ds1", 12, ConnectionMode.MEMORY_STRICTLY);
-            backendConnection.getTransactionStatus().setInTransaction(false);
-            mockResultSetAndStatement(backendConnection);
-            mockResultSetAndStatementException(backendConnection);
-            actual = backendConnection;
-        } catch (final SQLException ex) {
-            assertThat(ex.getNextException().getNextException(), instanceOf(SQLException.class));
-        }
-        assertNotNull(actual);
-        assertThat(actual.getConnectionSize(), is(0));
-        assertTrue(actual.getCachedConnections().isEmpty());
-        assertTrue(actual.getCachedResultSets().isEmpty());
-        assertTrue(actual.getCachedStatements().isEmpty());
-    }
-    
-    private void mockResultSetAndStatement(final BackendConnection backendConnection) {
-        ResultSet resultSet = mock(ResultSet.class);
-        Statement statement = mock(Statement.class);
-        backendConnection.add(resultSet);
-        backendConnection.add(statement);
-    }
-    
-    private void mockResultSetAndStatementException(final BackendConnection backendConnection) throws SQLException {
-        for (Statement each : backendConnection.getCachedStatements()) {
-            doThrow(SQLException.class).when(each).close();
-        }
-        for (ResultSet each : backendConnection.getCachedResultSets()) {
-            doThrow(SQLException.class).when(each).close();
-        }
-    }
-    
     @Test(expected = ShardingSphereException.class)
     public void assertFailedSwitchTransactionTypeWhileBegin() {
         BackendTransactionManager transactionManager = new BackendTransactionManager(backendConnection);
@@ -286,14 +213,4 @@ public final class BackendConnectionTest {
         transactionManager.begin();
         backendConnection.setCurrentSchema("newSchema");
     }
-    
-    @SneakyThrows(ReflectiveOperationException.class)
-    @After
-    public void clean() {
-        Field field = ProxyContext.getInstance().getClass().getDeclaredField("backendDataSource");
-        field.setAccessible(true);
-        Class<?> clazz = field.getType();
-        Object datasource = clazz.getDeclaredConstructors()[0].newInstance();
-        field.set(ProxyContext.getInstance(), datasource);
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
index 25734cc..83d8158 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/trnasaction/BackendTransactionManagerTest.java
@@ -86,7 +86,7 @@ public final class BackendTransactionManagerTest {
         newBackendTransactionManager(TransactionType.LOCAL, false);
         backendTransactionManager.begin();
         verify(transactionStatus).setInTransaction(true);
-        verify(backendConnection).releaseConnections(false);
+        verify(backendConnection).closeConnections(false);
         verify(localTransactionManager).begin();
     }
     
@@ -95,7 +95,7 @@ public final class BackendTransactionManagerTest {
         newBackendTransactionManager(TransactionType.XA, true);
         backendTransactionManager.begin();
         verify(transactionStatus, times(0)).setInTransaction(true);
-        verify(backendConnection, times(0)).releaseConnections(false);
+        verify(backendConnection, times(0)).closeConnections(false);
         verify(shardingTransactionManager).begin();
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 58b625d..43f3abb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
 import org.apache.shardingsphere.infra.hook.RootInvokeHook;
 import org.apache.shardingsphere.infra.hook.SPIRootInvokeHook;
+import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStatus;
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.Optional;
 
 /**
@@ -63,28 +65,29 @@ public final class CommandExecutorTask implements Runnable {
     public void run() {
         RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
         rootInvokeHook.start();
-        int connectionSize = 0;
         boolean isNeedFlush = false;
-        try (BackendConnection backendConnection = this.backendConnection;
-             PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
+        int connectionSize = 0;
+        try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload((ByteBuf) message)) {
             ConnectionStatus connectionStatus = backendConnection.getConnectionStatus();
-            connectionStatus.waitUntilConnectionRelease();
-            connectionStatus.switchToUsing();
+            if (!backendConnection.getTransactionStatus().isInConnectionHeldTransaction()) {
+                connectionStatus.waitUntilConnectionRelease();
+                connectionStatus.switchToUsing();
+            }
             isNeedFlush = executeCommand(context, payload, backendConnection);
             connectionSize = backendConnection.getConnectionSize();
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
-            context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
-            Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
-            databasePacket.ifPresent(context::writeAndFlush);
-            if (!ExpectedExceptions.isExpected(ex.getClass())) {
-                log.error("Exception occur: ", ex);
-            }
+            processException(ex);
         } finally {
             if (isNeedFlush) {
                 context.flush();
             }
+            Collection<SQLException> exceptions = closeExecutionResources();
+            if (!backendConnection.getTransactionStatus().isInConnectionHeldTransaction()) {
+                exceptions.addAll(backendConnection.closeConnections(false));
+            }
+            processClosedExceptions(exceptions);
             rootInvokeHook.finish(connectionSize);
         }
     }
@@ -105,4 +108,32 @@ public final class CommandExecutorTask implements Runnable {
         }
         return databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
     }
+    
+    private void processException(final Exception cause) {
+        context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(cause));
+        Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket();
+        databasePacket.ifPresent(context::writeAndFlush);
+        if (!ExpectedExceptions.isExpected(cause.getClass())) {
+            log.error("Exception occur: ", cause);
+        }
+    }
+    
+    private Collection<SQLException> closeExecutionResources() {
+        Collection<SQLException> result = new LinkedList<>();
+        MasterVisitedManager.clear();
+        result.addAll(backendConnection.closeResultSets());
+        result.addAll(backendConnection.closeStatements());
+        return result;
+    }
+    
+    private void processClosedExceptions(final Collection<SQLException> exceptions) {
+        if (exceptions.isEmpty()) {
+            return;
+        }
+        SQLException ex = new SQLException("");
+        for (SQLException each : exceptions) {
+            ex.setNextException(each);
+        }
+        processException(ex);
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index f0b6df0..414a262 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -23,16 +23,16 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.masterslave.route.engine.impl.MasterVisitedManager;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
 import org.apache.shardingsphere.proxy.frontend.auth.AuthenticationResult;
+import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
 import org.apache.shardingsphere.proxy.frontend.executor.ChannelThreadExecutorGroup;
 import org.apache.shardingsphere.proxy.frontend.executor.CommandExecutorSelector;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
-import java.sql.SQLException;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -90,10 +90,17 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
     }
     
     @Override
-    public void channelInactive(final ChannelHandlerContext context) throws SQLException {
+    public void channelInactive(final ChannelHandlerContext context) {
         context.fireChannelInactive();
+        closeAllResources(context);
+    }
+    
+    private void closeAllResources(final ChannelHandlerContext context) {
         databaseProtocolFrontendEngine.release(backendConnection);
-        backendConnection.close(true);
+        MasterVisitedManager.clear();
+        backendConnection.closeResultSets();
+        backendConnection.closeStatements();
+        backendConnection.closeConnections(true);
         ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index b708ef5..81f2611 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
 import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Answers;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -56,7 +57,7 @@ public final class CommandExecutorTaskTest {
     @Mock
     private PacketPayload payload;
     
-    @Mock
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private BackendConnection backendConnection;
     
     @Mock
@@ -100,6 +101,9 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
+        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -117,6 +121,9 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
+        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -139,6 +146,9 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
+        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -156,6 +166,9 @@ public final class CommandExecutorTaskTest {
         when(executeEngine.getErrorPacket(eq(mockException))).thenReturn(databasePacket);
         when(executeEngine.getOtherPacket()).thenReturn(Optional.of(databasePacket));
         when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
+        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
+        when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);