You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/06/23 10:27:30 UTC

[shardingsphere] branch master updated: Move cached statements and result sets into DatabaseCommunicationEngine (#10932)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ba64144  Move cached statements and result sets into DatabaseCommunicationEngine (#10932)
ba64144 is described below

commit ba6414447a80ac5a98a2ac30464b1964b6d7c5b4
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed Jun 23 18:27:05 2021 +0800

    Move cached statements and result sets into DatabaseCommunicationEngine (#10932)
    
    * Move cached statements and result sets into DatabaseCommunicationEngine
    
    * Complete tests for DatabaseCommunicationEngine
    
    * Close TextProtocolBackendHandler correctly
    
    * Add DatabaseCommunicationEngine to BackendConnection
---
 .../communication/DatabaseCommunicationEngine.java | 75 +++++++++++++++-
 .../DatabaseCommunicationEngineFactory.java        |  8 +-
 .../backend/communication/ProxySQLExecutor.java    |  9 +-
 .../jdbc/connection/BackendConnection.java         | 50 +++--------
 .../jdbc/executor/ProxyJDBCExecutor.java           |  7 +-
 .../callback/ProxyJDBCExecutorCallback.java        | 12 +--
 .../callback/ProxyJDBCExecutorCallbackFactory.java | 15 ++--
 .../ProxyPreparedStatementExecutorCallback.java    |  6 +-
 .../impl/ProxyStatementExecutorCallback.java       |  6 +-
 .../backend/text/TextProtocolBackendHandler.java   |  8 ++
 .../impl/SchemaAssignedDatabaseBackendHandler.java |  5 ++
 .../data/impl/UnicastDatabaseBackendHandler.java   |  5 ++
 .../DatabaseCommunicationEngineTest.java           | 99 ++++++++++++++++++++--
 .../jdbc/connection/BackendConnectionTest.java     | 73 ----------------
 .../frontend/command/CommandExecutorTask.java      | 20 +++--
 .../netty/FrontendChannelInboundHandler.java       |  3 +-
 .../frontend/command/CommandExecutorTaskTest.java  | 11 +--
 .../execute/MySQLComStmtExecuteExecutor.java       |  7 +-
 .../fieldlist/MySQLComFieldListPacketExecutor.java |  7 +-
 .../text/query/MySQLComQueryPacketExecutor.java    |  5 ++
 .../query/text/PostgreSQLComQueryExecutor.java     |  5 ++
 .../frontend/command/executor/CommandExecutor.java |  8 ++
 22 files changed, 274 insertions(+), 170 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index 2f1a058..b770f31 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -44,11 +44,15 @@ import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryH
 import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeaderBuilder;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
@@ -73,18 +77,40 @@ public final class DatabaseCommunicationEngine {
     
     private ProxyLockEngine proxyLockEngine;
     
+    private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
+    
+    private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
+    
     public DatabaseCommunicationEngine(final String driverType, final ShardingSphereMetaData metaData, final LogicSQL logicSQL, final BackendConnection backendConnection) {
         this.driverType = driverType;
         this.metaData = metaData;
         this.logicSQL = logicSQL;
-        proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection);
+        proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, this);
         kernelProcessor = new KernelProcessor();
-        proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new MetadataRefreshEngine(metaData, 
-                ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()), 
+        proxyLockEngine = new ProxyLockEngine(proxySQLExecutor, new MetadataRefreshEngine(metaData,
+                ProxyContext.getInstance().getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().get(backendConnection.getSchemaName()),
                 ProxyContext.getInstance().getMetaDataContexts().getProps(), ProxyContext.getInstance().getLock().orElse(null)), backendConnection.getSchemaName());
     }
     
     /**
+     * Add statement.
+     *
+     * @param statement statement to be added
+     */
+    public void add(final Statement statement) {
+        cachedStatements.add(statement);
+    }
+    
+    /**
+     * Add result set.
+     *
+     * @param resultSet result set to be added
+     */
+    public void add(final ResultSet resultSet) {
+        cachedResultSets.add(resultSet);
+    }
+    
+    /**
      * Execute to database.
      *
      * @return backend response
@@ -192,4 +218,47 @@ public final class DatabaseCommunicationEngine {
     private boolean isBinary() {
         return JDBCDriverType.PREPARED_STATEMENT.equals(driverType);
     }
+    
+    /**
+     * Close database communication engine.
+     *
+     * @throws SQLException SQL exception
+     */
+    public void close() throws SQLException {
+        Collection<SQLException> result = new LinkedList<>();
+        result.addAll(closeResultSets());
+        result.addAll(closeStatements());
+        if (result.isEmpty()) {
+            return;
+        }
+        SQLException ex = new SQLException();
+        result.forEach(ex::setNextException);
+        throw ex;
+    }
+    
+    private Collection<SQLException> closeResultSets() {
+        Collection<SQLException> result = new LinkedList<>();
+        for (ResultSet each : cachedResultSets) {
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        cachedResultSets.clear();
+        return result;
+    }
+    
+    private Collection<SQLException> closeStatements() {
+        Collection<SQLException> result = new LinkedList<>();
+        for (Statement each : cachedStatements) {
+            try {
+                each.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        cachedStatements.clear();
+        return result;
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
index e010424..459cc06 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineFactory.java
@@ -60,7 +60,9 @@ public final class DatabaseCommunicationEngineFactory {
     public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatement sqlStatement, final String sql, final BackendConnection backendConnection) {
         ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
         LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, Collections.emptyList(), metaData);
-        return new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, backendConnection);
+        DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, backendConnection);
+        backendConnection.add(result);
+        return result;
     }
     
     /**
@@ -75,7 +77,9 @@ public final class DatabaseCommunicationEngineFactory {
     public DatabaseCommunicationEngine newBinaryProtocolInstance(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final BackendConnection backendConnection) {
         ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
         LogicSQL logicSQL = createLogicSQL(sqlStatement, sql, new ArrayList<>(parameters), metaData);
-        return new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, backendConnection);
+        DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, backendConnection);
+        backendConnection.add(result);
+        return result;
     }
     
     private LogicSQL createLogicSQL(final SQLStatement sqlStatement, final String sql, final List<Object> parameters, final ShardingSphereMetaData metaData) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index fba5049..0df7432 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -67,18 +67,21 @@ public final class ProxySQLExecutor {
     
     private final BackendConnection backendConnection;
     
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
+    
     private final ProxyJDBCExecutor jdbcExecutor;
     
     private final RawExecutor rawExecutor;
     
     private final FederateExecutor federateExecutor;
     
-    public ProxySQLExecutor(final String type, final BackendConnection backendConnection) {
+    public ProxySQLExecutor(final String type, final BackendConnection backendConnection, final DatabaseCommunicationEngine databaseCommunicationEngine) {
         this.type = type;
         this.backendConnection = backendConnection;
+        this.databaseCommunicationEngine = databaseCommunicationEngine;
         ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
-        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, new JDBCExecutor(executorEngine, isSerialExecute));
+        jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection, databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getMetaDataContexts();
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, metaDataContexts.getProps());
         // TODO Consider FederateRawExecutor
@@ -145,7 +148,7 @@ public final class ProxySQLExecutor {
         }
         MetaDataContexts metaData = ProxyContext.getInstance().getMetaDataContexts();
         ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(), 
-                executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
+                executionContext.getSqlStatementContext().getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true);
         backendConnection.setFederateExecutor(federateExecutor);
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaData);
         return federateExecutor.executeQuery(executionContext, callback, prepareEngine).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index 1bcffb8..0beea56 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.Statemen
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -39,7 +40,6 @@ import org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
@@ -74,9 +74,7 @@ public final class BackendConnection implements ExecutorJDBCManager {
     
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
     
-    private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
-    
-    private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();
+    private final Collection<DatabaseCommunicationEngine> cachedDatabaseCommunicationEngines = new CopyOnWriteArrayList<>();
     
     private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
     
@@ -213,56 +211,28 @@ public final class BackendConnection implements ExecutorJDBCManager {
     }
     
     /**
-     * Add statement.
-     *
-     * @param statement statement to be added
-     */
-    public void add(final Statement statement) {
-        cachedStatements.add(statement);
-    }
-    
-    /**
-     * Add result set.
+     * Add database communication engine.
      *
-     * @param resultSet result set to be added
+     * @param databaseCommunicationEngine database communication engine to be added
      */
-    public void add(final ResultSet resultSet) {
-        cachedResultSets.add(resultSet);
-    }
-    
-    /**
-     * Close result sets.
-     *
-     * @return SQL exception when result sets close
-     */
-    public synchronized Collection<SQLException> closeResultSets() {
-        Collection<SQLException> result = new LinkedList<>();
-        for (ResultSet each : cachedResultSets) {
-            try {
-                each.close();
-            } catch (final SQLException ex) {
-                result.add(ex);
-            }
-        }
-        cachedResultSets.clear();
-        return result;
+    public void add(final DatabaseCommunicationEngine databaseCommunicationEngine) {
+        cachedDatabaseCommunicationEngines.add(databaseCommunicationEngine);
     }
     
     /**
-     * Close statements.
+     * Close database communication engines.
      *
-     * @return SQL exception when statements close
+     * @return SQL exception when engine close
      */
-    public synchronized Collection<SQLException> closeStatements() {
+    public synchronized Collection<SQLException> closeDatabaseCommunicationEngines() {
         Collection<SQLException> result = new LinkedList<>();
-        for (Statement each : cachedStatements) {
+        for (DatabaseCommunicationEngine each : cachedDatabaseCommunicationEngines) {
             try {
                 each.close();
             } catch (final SQLException ex) {
                 result.add(ex);
             }
         }
-        cachedStatements.clear();
         return result;
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4087d10..d6e12c6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -44,6 +45,8 @@ public final class ProxyJDBCExecutor {
     
     private final BackendConnection backendConnection;
     
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
+    
     @Getter
     private final JDBCExecutor jdbcExecutor;
     
@@ -64,8 +67,8 @@ public final class ProxyJDBCExecutor {
             DatabaseType databaseType = metaDataContexts.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType();
             ExecuteProcessEngine.initialize(context, executionGroupContext, metaDataContexts.getProps());
             Collection<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true),
-                    ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, false));
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, true),
+                    ProxyJDBCExecutorCallbackFactory.newInstance(type, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, false));
             ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
             return result;
         } finally {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
index 2d05d4f..3285833 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallback.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -43,7 +43,7 @@ import java.util.Optional;
  */
 public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<ExecuteResult> {
     
-    private final BackendConnection backendConnection;
+    private final DatabaseCommunicationEngine databaseCommunicationEngine;
     
     private final boolean isReturnGeneratedKeys;
     
@@ -51,10 +51,10 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
     
     private boolean hasMetaData;
     
-    public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+    public ProxyJDBCExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
                                      final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
         super(databaseType, sqlStatement, isExceptionThrown);
-        this.backendConnection = backendConnection;
+        this.databaseCommunicationEngine = databaseCommunicationEngine;
         this.isReturnGeneratedKeys = isReturnGeneratedKeys;
         this.fetchMetaData = fetchMetaData;
     }
@@ -69,10 +69,10 @@ public abstract class ProxyJDBCExecutorCallback extends JDBCExecutorCallback<Exe
     }
     
     private ExecuteResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final boolean withMetadata) throws SQLException {
-        backendConnection.add(statement);
+        databaseCommunicationEngine.add(statement);
         if (execute(sql, statement, isReturnGeneratedKeys)) {
             ResultSet resultSet = statement.getResultSet();
-            backendConnection.add(resultSet);
+            databaseCommunicationEngine.add(resultSet);
             return createQueryResult(resultSet, connectionMode);
         }
         return new UpdateResult(statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0L);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
index 3967393..f7edf40 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/ProxyJDBCExecutorCallbackFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyPreparedStatementExecutorCallback;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl.ProxyStatementExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -34,23 +34,24 @@ public final class ProxyJDBCExecutorCallbackFactory {
     
     /**
      * Create new instance of Proxy JDBC executor callback.
-     * 
+     *
      * @param type driver type
      * @param databaseType database type
      * @param sqlStatement SQL statement
-     * @param backendConnection backend connection
+     * @param databaseCommunicationEngine database communication engine
      * @param isReturnGeneratedKeys is return generated keys or not
      * @param isExceptionThrown is exception thrown or not
      * @param isFetchMetaData is fetch meta data or not
      * @return instance of Proxy JDBC executor callback
      */
-    public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
-                                                        final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean isFetchMetaData) {
+    public static ProxyJDBCExecutorCallback newInstance(final String type, final DatabaseType databaseType, final SQLStatement sqlStatement,
+                                                        final DatabaseCommunicationEngine databaseCommunicationEngine, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown,
+                                                        final boolean isFetchMetaData) {
         if (JDBCDriverType.STATEMENT.equals(type)) {
-            return new ProxyStatementExecutorCallback(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+            return new ProxyStatementExecutorCallback(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
         }
         if (JDBCDriverType.PREPARED_STATEMENT.equals(type)) {
-            return new ProxyPreparedStatementExecutorCallback(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
+            return new ProxyPreparedStatementExecutorCallback(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, isFetchMetaData);
         }
         throw new UnsupportedOperationException(String.format("Unsupported driver type: `%s`", type));
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
index 67bcf1c..f8b4a60 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyPreparedStatementExecutorCallback.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -31,9 +31,9 @@ import java.sql.Statement;
  */
 public final class ProxyPreparedStatementExecutorCallback extends ProxyJDBCExecutorCallback {
     
-    public ProxyPreparedStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+    public ProxyPreparedStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
                                                   final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
index 88647fc..17c2044 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/callback/impl/ProxyStatementExecutorCallback.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.impl;
 
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
@@ -30,9 +30,9 @@ import java.sql.Statement;
  */
 public final class ProxyStatementExecutorCallback extends ProxyJDBCExecutorCallback {
     
-    public ProxyStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final BackendConnection backendConnection,
+    public ProxyStatementExecutorCallback(final DatabaseType databaseType, final SQLStatement sqlStatement, final DatabaseCommunicationEngine databaseCommunicationEngine,
                                           final boolean isReturnGeneratedKeys, final boolean isExceptionThrown, final boolean fetchMetaData) {
-        super(databaseType, sqlStatement, backendConnection, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
+        super(databaseType, sqlStatement, databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown, fetchMetaData);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
index f87cb1d..5c4e12f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandler.java
@@ -55,4 +55,12 @@ public interface TextProtocolBackendHandler {
     default Collection<Object> getRowData() throws SQLException {
         return Collections.emptyList();
     }
+    
+    /**
+     * Close handler.
+     *
+     * @throws SQLException SQL exception
+     */
+    default void close() throws SQLException {
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
index f2633a8..8d8ad45 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/SchemaAssignedDatabaseBackendHandler.java
@@ -64,4 +64,9 @@ public final class SchemaAssignedDatabaseBackendHandler implements DatabaseBacke
     public Collection<Object> getRowData() throws SQLException {
         return databaseCommunicationEngine.getQueryResponseRow().getData();
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
index 57ec70f..3871ad1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/data/impl/UnicastDatabaseBackendHandler.java
@@ -74,4 +74,9 @@ public final class UnicastDatabaseBackendHandler implements DatabaseBackendHandl
     public Collection<Object> getRowData() throws SQLException {
         return databaseCommunicationEngine.getQueryResponseRow().getData();
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
index 5677758..e2768a4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngineTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
+import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
 import org.apache.shardingsphere.infra.context.metadata.impl.StandardMetaDataContexts;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
@@ -44,11 +44,17 @@ import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLSta
 import org.apache.shardingsphere.transaction.context.impl.StandardTransactionContexts;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -56,18 +62,33 @@ import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public final class DatabaseCommunicationEngineTest {
     
+    @Mock
+    private BackendConnection backendConnection;
+    
+    @Mock
+    private Statement statement;
+    
+    @Mock
+    private ResultSet resultSet;
+    
     @Before
     public void setUp() {
-        MetaDataContexts metaDataContexts = new StandardMetaDataContexts(mockMetaDataMap(), mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class), 
+        when(backendConnection.getSchemaName()).thenReturn("schema");
+        MetaDataContexts metaDataContexts = new StandardMetaDataContexts(mockMetaDataMap(), mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class),
                 new ConfigurationProperties(new Properties()));
         ProxyContext.getInstance().init(metaDataContexts, new StandardTransactionContexts());
     }
@@ -81,8 +102,6 @@ public final class DatabaseCommunicationEngineTest {
     
     @Test
     public void assertBinaryProtocolQueryHeader() throws SQLException, NoSuchFieldException {
-        BackendConnection backendConnection = mock(BackendConnection.class);
-        when(backendConnection.getSchemaName()).thenReturn("schema");
         DatabaseCommunicationEngine engine =
                 DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
         assertNotNull(engine);
@@ -95,7 +114,7 @@ public final class DatabaseCommunicationEngineTest {
             private MemoryQueryResultRow memoryQueryResultRow;
             
             @Override
-            protected List<MemoryQueryResultRow> init(final ShardingSphereRule rule, final ShardingSphereSchema schema, 
+            protected List<MemoryQueryResultRow> init(final ShardingSphereRule rule, final ShardingSphereSchema schema,
                                                       final SQLStatementContext sqlStatementContext, final List<QueryResult> queryResults) {
                 memoryQueryResultRow = mock(MemoryQueryResultRow.class);
                 return Collections.singletonList(memoryQueryResultRow);
@@ -117,7 +136,6 @@ public final class DatabaseCommunicationEngineTest {
         ShardingSphereSchema schema = mock(ShardingSphereSchema.class);
         when(schema.get("t_logic_order")).thenReturn(new TableMetaData(Collections.singletonList(columnMetaData), Collections.singletonList(new IndexMetaData("order_id"))));
         DataSourcesMetaData dataSourcesMetaData = mock(DataSourcesMetaData.class);
-        when(dataSourcesMetaData.getDataSourceMetaData("ds_0")).thenReturn(mock(DataSourceMetaData.class));
         when(result.getResource().getDataSourcesMetaData()).thenReturn(dataSourcesMetaData);
         when(result.getSchema()).thenReturn(schema);
         ShardingRule shardingRule = mock(ShardingRule.class);
@@ -132,7 +150,6 @@ public final class DatabaseCommunicationEngineTest {
         when(result.getTableName(1)).thenReturn("t_order");
         when(result.getColumnLabel(1)).thenReturn("order_id");
         when(result.getColumnName(1)).thenReturn("order_id");
-        when(result.getColumnName(2)).thenReturn("expr");
         when(result.getColumnType(1)).thenReturn(Types.INTEGER);
         when(result.isSigned(1)).thenReturn(true);
         when(result.isAutoIncrement(1)).thenReturn(true);
@@ -141,4 +158,72 @@ public final class DatabaseCommunicationEngineTest {
         when(result.isNotNull(1)).thenReturn(true);
         return result;
     }
+    
+    @Test
+    public void assertAddStatementCorrectly() {
+        DatabaseCommunicationEngine engine =
+                DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+        engine.add(statement);
+        Collection<?> actual = getField(engine, "cachedStatements");
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), is(statement));
+    }
+    
+    @Test
+    public void assertAddResultSetCorrectly() {
+        DatabaseCommunicationEngine engine =
+                DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+        engine.add(resultSet);
+        Collection<?> actual = getField(engine, "cachedResultSets");
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), is(resultSet));
+    }
+    
+    @Test
+    public void assertCloseCorrectly() throws SQLException {
+        DatabaseCommunicationEngine engine =
+                DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+        Collection<ResultSet> cachedResultSets = getField(engine, "cachedResultSets");
+        cachedResultSets.add(resultSet);
+        Collection<Statement> cachedStatements = getField(engine, "cachedStatements");
+        cachedStatements.add(statement);
+        engine.close();
+        verify(resultSet).close();
+        verify(statement).close();
+        assertTrue(cachedResultSets.isEmpty());
+        assertTrue(cachedStatements.isEmpty());
+    }
+    
+    @Test
+    public void assertCloseResultSetsWithExceptionThrown() throws SQLException {
+        DatabaseCommunicationEngine engine =
+                DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(mock(MySQLStatement.class), "schemaName", Collections.emptyList(), backendConnection);
+        Collection<ResultSet> cachedResultSets = getField(engine, "cachedResultSets");
+        SQLException sqlExceptionByResultSet = new SQLException("ResultSet");
+        doThrow(sqlExceptionByResultSet).when(resultSet).close();
+        cachedResultSets.add(resultSet);
+        Collection<Statement> cachedStatements = getField(engine, "cachedStatements");
+        SQLException sqlExceptionByStatement = new SQLException("Statement");
+        doThrow(sqlExceptionByStatement).when(statement).close();
+        cachedStatements.add(statement);
+        SQLException actual = null;
+        try {
+            engine.close();
+        } catch (final SQLException ex) {
+            actual = ex;
+        }
+        verify(resultSet).close();
+        verify(statement).close();
+        assertTrue(cachedResultSets.isEmpty());
+        assertTrue(cachedStatements.isEmpty());
+        assertThat(actual.getNextException(), is(sqlExceptionByResultSet));
+        assertThat(actual.getNextException().getNextException(), is(sqlExceptionByStatement));
+    }
+    
+    @SneakyThrows
+    private <T> T getField(final DatabaseCommunicationEngine target, final String fieldName) {
+        Field field = DatabaseCommunicationEngine.class.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(target);
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
index b7498eb..b9e3ea8 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnectionTest.java
@@ -44,7 +44,6 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collection;
@@ -260,78 +259,6 @@ public final class BackendConnectionTest {
     }
     
     @Test
-    public void assertAddStatementCorrectly() throws NoSuchFieldException, IllegalAccessException {
-        Statement statement = mock(Statement.class);
-        backendConnection.add(statement);
-        Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        assertTrue(((Collection<?>) field.get(backendConnection)).contains(statement));
-    }
-    
-    @Test
-    public void assertAddResultSetCorrectly() throws NoSuchFieldException, IllegalAccessException {
-        ResultSet resultSet = mock(ResultSet.class);
-        backendConnection.add(resultSet);
-        Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        assertTrue(((Collection<?>) field.get(backendConnection)).contains(resultSet));
-    }
-    
-    @Test
-    public void assertCloseResultSetsCorrectly() throws NoSuchFieldException, SQLException, IllegalAccessException {
-        Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) field.get(backendConnection);
-        ResultSet resultSet = mock(ResultSet.class);
-        cachedResultSets.add(resultSet);
-        backendConnection.closeResultSets();
-        verify(resultSet, times(1)).close();
-        assertTrue(cachedResultSets.isEmpty());
-    }
-    
-    @Test
-    public void assertCloseResultSetsWithExceptionThrown() throws NoSuchFieldException, SQLException, IllegalAccessException {
-        Field field = backendConnection.getClass().getDeclaredField("cachedResultSets");
-        field.setAccessible(true);
-        Collection<ResultSet> cachedResultSets = (Collection<ResultSet>) field.get(backendConnection);
-        ResultSet resultSet = mock(ResultSet.class);
-        SQLException sqlException = new SQLException("");
-        doThrow(sqlException).when(resultSet).close();
-        cachedResultSets.add(resultSet);
-        Collection<SQLException> result = backendConnection.closeResultSets();
-        verify(resultSet, times(1)).close();
-        assertTrue(cachedResultSets.isEmpty());
-        assertTrue(result.contains(sqlException));
-    }
-    
-    @Test
-    public void assertCloseStatementsCorrectly() throws NoSuchFieldException, SQLException, IllegalAccessException {
-        Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        Collection<Statement> cachedStatement = (Collection<Statement>) field.get(backendConnection);
-        Statement statement = mock(Statement.class);
-        cachedStatement.add(statement);
-        backendConnection.closeStatements();
-        verify(statement, times(1)).close();
-        assertTrue(cachedStatement.isEmpty());
-    }
-    
-    @Test
-    public void assertCloseStatementsWithExceptionThrown() throws SQLException, NoSuchFieldException, IllegalAccessException {
-        Field field = backendConnection.getClass().getDeclaredField("cachedStatements");
-        field.setAccessible(true);
-        Collection<Statement> cachedStatement = (Collection<Statement>) field.get(backendConnection);
-        Statement statement = mock(Statement.class);
-        cachedStatement.add(statement);
-        SQLException sqlException = new SQLException("");
-        doThrow(sqlException).when(statement).close();
-        Collection<SQLException> result = backendConnection.closeStatements();
-        verify(statement, times(1)).close();
-        assertTrue(cachedStatement.isEmpty());
-        assertTrue(result.contains(sqlException));
-    }
-    
-    @Test
     public void assertCloseConnectionsCorrectlyWhenNotForceRollback() throws NoSuchFieldException, IllegalAccessException, SQLException {
         Field field = backendConnection.getClass().getDeclaredField("cachedConnections");
         field.setAccessible(true);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index e92d22f..d55dabd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -90,13 +90,17 @@ public final class CommandExecutorTask implements Runnable {
         CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
         CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(payload, type, backendConnection);
         CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(type, commandPacket, backendConnection);
-        Collection<DatabasePacket<?>> responsePackets = commandExecutor.execute();
-        if (responsePackets.isEmpty()) {
-            return false;
-        }
-        responsePackets.forEach(context::write);
-        if (commandExecutor instanceof QueryCommandExecutor) {
-            return commandExecuteEngine.writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
+        try {
+            Collection<DatabasePacket<?>> responsePackets = commandExecutor.execute();
+            if (responsePackets.isEmpty()) {
+                return false;
+            }
+            responsePackets.forEach(context::write);
+            if (commandExecutor instanceof QueryCommandExecutor) {
+                return commandExecuteEngine.writeQueryData(context, backendConnection, (QueryCommandExecutor) commandExecutor, responsePackets.size());
+            }
+        } finally {
+            commandExecutor.close();
         }
         return databaseProtocolFrontendEngine.getFrontendContext().isFlushForPerCommandPacket();
     }
@@ -113,8 +117,6 @@ public final class CommandExecutorTask implements Runnable {
     private Collection<SQLException> closeExecutionResources() {
         Collection<SQLException> result = new LinkedList<>();
         PrimaryVisitedManager.clear();
-        result.addAll(backendConnection.closeResultSets());
-        result.addAll(backendConnection.closeStatements());
         result.addAll(backendConnection.closeFederateExecutor());
         return result;
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index d4d2984..cd6c297 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,8 +94,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
     private void closeAllResources() {
         ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
         PrimaryVisitedManager.clear();
-        backendConnection.closeResultSets();
-        backendConnection.closeStatements();
+        backendConnection.closeDatabaseCommunicationEngines();
         backendConnection.closeConnections(true);
         backendConnection.closeFederateExecutor();
         databaseProtocolFrontendEngine.release(backendConnection);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 822d715..b9422df 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -102,14 +102,13 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
         verify(connectionStatus).switchToUsing();
+        verify(queryCommandExecutor).close();
     }
     
     @Test
@@ -123,8 +122,6 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
@@ -134,6 +131,7 @@ public final class CommandExecutorTaskTest {
         verify(handlerContext).write(databasePacket);
         verify(handlerContext).flush();
         verify(executeEngine).writeQueryData(handlerContext, backendConnection, queryCommandExecutor, 1);
+        verify(queryCommandExecutor).close();
     }
     
     @Test
@@ -148,8 +146,6 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.getConnectionStatus()).thenReturn(connectionStatus);
         when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
         when(engine.getCodecEngine()).thenReturn(codecEngine);
-        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
@@ -158,6 +154,7 @@ public final class CommandExecutorTaskTest {
         verify(connectionStatus).switchToUsing();
         verify(handlerContext).write(databasePacket);
         verify(handlerContext).flush();
+        verify(commandExecutor).close();
     }
     
     @Test
@@ -169,8 +166,6 @@ public final class CommandExecutorTaskTest {
         when(executeEngine.getErrorPacket(mockException, backendConnection)).thenReturn(databasePacket);
         when(executeEngine.getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
         when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
-        when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
-        when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
         when(backendConnection.closeFederateExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index a6569cd..afcde18 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,8 +25,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.executor.check.SQLCheckEngine;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
@@ -111,4 +111,9 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
         return new BinaryRow(queryResponseRow.getCells().stream().map(
             each -> new BinaryCell(MySQLBinaryColumnType.valueOfJDBCType(((BinaryQueryResponseCell) each).getJdbcType()), each.getData())).collect(Collectors.toList()));
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
index 3e2b1bb..9f7e811 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/fieldlist/MySQLComFieldListPacketExecutor.java
@@ -23,12 +23,12 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fie
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.SQLException;
@@ -76,4 +76,9 @@ public final class MySQLComFieldListPacketExecutor implements CommandExecutor {
         result.add(new MySQLEofPacket(++currentSequenceId));
         return result;
     }
+    
+    @Override
+    public void close() throws SQLException {
+        databaseCommunicationEngine.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
index 9984604..9d06568 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java
@@ -79,4 +79,9 @@ public final class MySQLComQueryPacketExecutor implements QueryCommandExecutor {
     public MySQLPacket getQueryRowPacket() throws SQLException {
         return new MySQLTextResultSetRowPacket(++currentSequenceId, textProtocolBackendHandler.getRowData());
     }
+    
+    @Override
+    public void close() throws SQLException {
+        textProtocolBackendHandler.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
index 5e11134..9bfb337 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
@@ -97,4 +97,9 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
     public PostgreSQLPacket getQueryRowPacket() throws SQLException {
         return new PostgreSQLDataRowPacket(textProtocolBackendHandler.getRowData());
     }
+    
+    @Override
+    public void close() throws SQLException {
+        textProtocolBackendHandler.close();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
index c1aa8a4..cc94e31 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/executor/CommandExecutor.java
@@ -34,4 +34,12 @@ public interface CommandExecutor {
      * @throws SQLException SQL exception
      */
     Collection<DatabasePacket<?>> execute() throws SQLException;
+    
+    /**
+     * Close command executor.
+     *
+     * @throws SQLException SQL exception
+     */
+    default void close() throws SQLException {
+    }
 }