You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/06/23 12:42:04 UTC

[shardingsphere] branch master updated: Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f9fc2f352f5 Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
f9fc2f352f5 is described below

commit f9fc2f352f5806f2d553d512e2704dca6aa2e208
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Thu Jun 23 20:41:55 2022 +0800

    Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
    
    * Refactor PreparedStatement of MySQL Proxy
    
    * Complete tests in db-protocol-mysql
    
    * Complete MySQLCommandPacketFactoryTest
    
    * Complete MySQLComStmtExecuteExecutorTest
    
    * Add PreparedStatementRegistryTest
    
    * Add MySQLComStmtCloseExecutorTest
    
    * Complete MySQLComStmtPrepareExecutorTest
    
    * Fix code style in MySQLComStmtPrepareExecutorTest
    
    * Move MySQLStatementIDGenerator from protocol to frontend
---
 .../binary/MySQLPreparedStatementRegistry.java     | 114 ---------------------
 .../binary/MySQLPreparedStatementRegistryTest.java |  78 --------------
 .../execute/MySQLComStmtExecutePacketTest.java     |  11 --
 .../proxy/backend/session/ConnectionSession.java   |   2 +
 .../proxy/backend/session/PreparedStatement.java   |  37 ++++---
 .../backend/session/PreparedStatementRegistry.java |  60 +++++++++++
 .../session/PreparedStatementRegistryTest.java     |  57 +++++++++++
 .../proxy/frontend/mysql/MySQLFrontendEngine.java  |   4 +-
 .../authentication/MySQLAuthenticationEngine.java  |   4 +-
 .../mysql/command/MySQLCommandExecuteEngine.java   |   2 +-
 .../mysql/command/MySQLCommandExecutorFactory.java |   2 +-
 .../mysql/command/MySQLCommandPacketFactory.java   |  11 +-
 .../query/binary/MySQLPreparedStatement.java       |   9 +-
 .../query/binary/MySQLStatementIDGenerator.java    |  73 +++++++++++++
 .../binary/close/MySQLComStmtCloseExecutor.java    |  14 +--
 .../execute/MySQLComStmtExecuteExecutor.java       |  17 +--
 .../prepare/MySQLComStmtPrepareExecutor.java       |   9 +-
 .../command/MySQLCommandPacketFactoryTest.java     |  79 +++++++-------
 .../binary/MySQLStatementIDGeneratorTest.java      |  46 +++++++++
 .../close/MySQLComStmtCloseExecutorTest.java       |  44 ++++++++
 .../execute/MySQLComStmtExecuteExecutorTest.java   |  45 ++++++--
 .../prepare/MySQLComStmtPrepareExecutorTest.java   |  93 ++++++++++++++++-
 .../ReactiveMySQLComStmtExecuteExecutor.java       |  19 ++--
 23 files changed, 521 insertions(+), 309 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
deleted file mode 100644
index 273e56dfccf..00000000000
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * MySQL prepared statement registry.
- */
-@NoArgsConstructor(access = AccessLevel.NONE)
-public final class MySQLPreparedStatementRegistry {
-    
-    private static final MySQLPreparedStatementRegistry INSTANCE = new MySQLPreparedStatementRegistry();
-    
-    private final ConcurrentMap<Integer, MySQLConnectionPreparedStatements> connectionRegistry = new ConcurrentHashMap<>(8192, 1);
-    
-    /**
-     * Get prepared statement registry instance.
-     *
-     * @return prepared statement registry instance
-     */
-    public static MySQLPreparedStatementRegistry getInstance() {
-        return INSTANCE;
-    }
-    
-    /**
-     * Register connection.
-     *
-     * @param connectionId connection ID
-     */
-    public void registerConnection(final int connectionId) {
-        connectionRegistry.put(connectionId, new MySQLConnectionPreparedStatements());
-    }
-    
-    /**
-     * Get connection prepared statements.
-     * 
-     * @param connectionId connection ID
-     * @return MySQL connection prepared statements
-     */
-    public MySQLConnectionPreparedStatements getConnectionPreparedStatements(final int connectionId) {
-        return connectionRegistry.get(connectionId);
-    }
-    
-    /**
-     * Unregister connection.
-     *
-     * @param connectionId connection ID
-     */
-    public void unregisterConnection(final int connectionId) {
-        connectionRegistry.remove(connectionId);
-    }
-    
-    public static class MySQLConnectionPreparedStatements {
-        
-        private final Map<Integer, MySQLPreparedStatement> preparedStatements = new ConcurrentHashMap<>(16384, 1);
-        
-        private final AtomicInteger sequence = new AtomicInteger();
-        
-        /**
-         * Prepare statement.
-         *
-         * @param sql SQL
-         * @param sqlStatement sql statement of prepared statement
-         * @return statement ID
-         */
-        public int prepareStatement(final String sql, final SQLStatement sqlStatement) {
-            int result = sequence.incrementAndGet();
-            preparedStatements.put(result, new MySQLPreparedStatement(sql, sqlStatement));
-            return result;
-        }
-        
-        /**
-         * Get prepared statement.
-         *
-         * @param statementId statement ID
-         * @return prepared statement
-         */
-        public MySQLPreparedStatement get(final int statementId) {
-            return preparedStatements.get(statementId);
-        }
-        
-        /**
-         * Close statement.
-         *
-         * @param statementId statement ID
-         */
-        public void closeStatement(final int statementId) {
-            preparedStatements.remove(statementId);
-        }
-    }
-}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
deleted file mode 100644
index 3808fa3a86b..00000000000
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-public final class MySQLPreparedStatementRegistryTest {
-    
-    private static final int CONNECTION_ID = 1;
-    
-    private static final String SQL = "SELECT * FROM tbl WHERE id=?";
-    
-    @Before
-    public void setup() {
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
-    }
-    
-    @Test
-    public void assertRegisterIfAbsent() {
-        assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(1));
-        MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertPrepareSameSQL() {
-        assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(1));
-        assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(2));
-        MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-        actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertThat(actual.getSql(), is(SQL));
-        assertThat(actual.getSqlStatement().getParameterCount(), is(1));
-    }
-    
-    @Test
-    public void assertCloseStatement() {
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement());
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).closeStatement(1);
-        MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
-        assertNull(actual);
-    }
-    
-    private MySQLSelectStatement prepareSQLStatement() {
-        MySQLSelectStatement result = new MySQLSelectStatement();
-        result.setParameterCount(1);
-        return result;
-    }
-    
-    @After
-    public void tearDown() {
-        MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
-    }
-}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index 2a57d47c9ae..bf6677eb23e 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -21,10 +21,7 @@ import io.netty.buffer.Unpooled;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
-import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
@@ -39,14 +36,6 @@ import static org.junit.Assert.assertTrue;
 
 public final class MySQLComStmtExecutePacketTest {
     
-    @Before
-    public void setup() {
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
-        MySQLSelectStatement sqlStatement = new MySQLSelectStatement();
-        sqlStatement.setParameterCount(1);
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("SELECT id FROM tbl WHERE id=?", sqlStatement);
-    }
-    
     @Test
     public void assertNewWithoutParameter() throws SQLException {
         byte[] data = {0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x00, 0x00, 0x00};
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index c807e19b46d..75b0254e603 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -78,6 +78,8 @@ public final class ConnectionSession {
     
     private final Map<String, CursorStatementContext> cursorDefinitions = new ConcurrentHashMap<>();
     
+    private final PreparedStatementRegistry preparedStatementRegistry = new PreparedStatementRegistry();
+    
     public ConnectionSession(final DatabaseType databaseType, final TransactionType initialTransactionType, final AttributeMap attributeMap) {
         this.databaseType = databaseType;
         transactionStatus = new TransactionStatus(initialTransactionType);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
similarity index 57%
copy from shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
copy to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
index 1b356b54413..daeb62cc0f2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
@@ -15,27 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.backend.session;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
-import java.util.Collections;
-import java.util.List;
-
 /**
- * Binary prepared statement for MySQL.
+ * Logic prepared statement for clients of ShardingSphere-Proxy.
  */
-@RequiredArgsConstructor
-@Getter
-@Setter
-public final class MySQLPreparedStatement {
+public interface PreparedStatement {
     
-    private final String sql;
+    /**
+     * Get SQL of prepared statement.
+     *
+     * @return SQL
+     */
+    String getSql();
     
-    private final SQLStatement sqlStatement;
+    /**
+     * Get {@link SQLStatement} of prepared statement.
+     *
+     * @return {@link SQLStatement}
+     */
+    SQLStatement getSqlStatement();
     
-    private List<MySQLPreparedStatementParameterType> parameterTypes = Collections.emptyList();
+    /**
+     * Get {@link SQLStatementContext} of prepared statement.
+     *
+     * @return {@link SQLStatementContext}
+     */
+    SQLStatementContext<?> getSqlStatementContext();
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
new file mode 100644
index 00000000000..b474a5deaa3
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * {@link PreparedStatement} registry for {@link ConnectionSession}.
+ */
+public final class PreparedStatementRegistry {
+    
+    private final Map<Object, PreparedStatement> preparedStatements = new ConcurrentHashMap<>();
+    
+    /**
+     * Add {@link PreparedStatement} into registry.
+     *
+     * @param statementId statement ID
+     * @param preparedStatement prepared statement
+     */
+    public void addPreparedStatement(final Object statementId, final PreparedStatement preparedStatement) {
+        preparedStatements.put(statementId, preparedStatement);
+    }
+    
+    /**
+     * Get prepared statement by statement ID.
+     *
+     * @param <T> implementation of {@link PreparedStatement}
+     * @param statementId statement ID
+     * @return {@link PreparedStatement}
+     */
+    @SuppressWarnings("unchecked")
+    public <T extends PreparedStatement> T getPreparedStatement(final Object statementId) {
+        return (T) preparedStatements.get(statementId);
+    }
+    
+    /**
+     * Remove {@link PreparedStatement} from registry.
+     *
+     * @param statementId statement ID
+     */
+    public void removePreparedStatement(final Object statementId) {
+        preparedStatements.remove(statementId);
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
new file mode 100644
index 00000000000..6ad372c6973
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class PreparedStatementRegistryTest {
+    
+    @Test
+    public void assertAddAndGetAndClosePreparedStatement() {
+        PreparedStatement expected = new DummyPreparedStatement();
+        PreparedStatementRegistry registry = new PreparedStatementRegistry();
+        registry.addPreparedStatement(1, expected);
+        assertThat(registry.getPreparedStatement(1), is(expected));
+        registry.removePreparedStatement(1);
+        assertNull(registry.getPreparedStatement(1));
+    }
+    
+    private static class DummyPreparedStatement implements PreparedStatement {
+        
+        @Override
+        public String getSql() {
+            throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public SQLStatement getSqlStatement() {
+            throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public SQLStatementContext<?> getSqlStatementContext() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 6df5b49db04..92ed5880826 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -59,7 +59,7 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
     
     @Override
     public void release(final ConnectionSession connectionSession) {
-        MySQLPreparedStatementRegistry.getInstance().unregisterConnection(connectionSession.getConnectionId());
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(connectionSession.getConnectionId());
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 0ec0494b7d2..8a5a0fcd738 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerErrorCode;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -68,7 +68,7 @@ public final class MySQLAuthenticationEngine implements AuthenticationEngine {
         int result = ConnectionIdGenerator.getInstance().nextId();
         connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
         context.writeAndFlush(new MySQLHandshakePacket(result, authenticationHandler.getAuthPluginData()));
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(result);
+        MySQLStatementIDGenerator.getInstance().registerConnection(result);
         return result;
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 5721171c8a5..9567549c33b 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -53,7 +53,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
     
     @Override
     public MySQLCommandPacket getCommandPacket(final PacketPayload payload, final CommandPacketType type, final ConnectionSession connectionSession) throws SQLException {
-        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload, connectionSession.getConnectionId());
+        return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload, connectionSession);
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 70e296d8a0c..2eed6d6fb6a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -82,7 +82,7 @@ public final class MySQLCommandExecutorFactory {
             case COM_STMT_RESET:
                 return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) commandPacket, connectionSession);
             case COM_STMT_CLOSE:
-                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) commandPacket, connectionSession.getConnectionId());
+                return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) commandPacket, connectionSession);
             case COM_SET_OPTION:
                 return new MySQLComSetOptionExecutor((MySQLComSetOptionPacket) commandPacket, connectionSession);
             default:
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
index eec4205d8f4..ed514ac4224 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -35,6 +33,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 
 import java.sql.SQLException;
 
@@ -49,11 +49,11 @@ public final class MySQLCommandPacketFactory {
      *
      * @param commandPacketType command packet type for MySQL
      * @param payload packet payload for MySQL
-     * @param connectionId connection ID
+     * @param connectionSession connection session
      * @return created instance
      * @throws SQLException SQL exception
      */
-    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType commandPacketType, final MySQLPacketPayload payload, final int connectionId) throws SQLException {
+    public static MySQLCommandPacket newInstance(final MySQLCommandPacketType commandPacketType, final MySQLPacketPayload payload, final ConnectionSession connectionSession) throws SQLException {
         switch (commandPacketType) {
             case COM_QUIT:
                 return new MySQLComQuitPacket();
@@ -66,8 +66,7 @@ public final class MySQLCommandPacketFactory {
             case COM_STMT_PREPARE:
                 return new MySQLComStmtPreparePacket(payload);
             case COM_STMT_EXECUTE:
-                MySQLPreparedStatement preparedStatement = MySQLPreparedStatementRegistry.getInstance()
-                        .getConnectionPreparedStatements(connectionId).get(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
+                MySQLPreparedStatement preparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
                 return new MySQLComStmtExecutePacket(payload, preparedStatement.getSqlStatement().getParameterCount());
             case COM_STMT_RESET:
                 return new MySQLComStmtResetPacket(payload);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
similarity index 72%
rename from shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
rename to shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
index 1b356b54413..9588539f3a2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Collections;
@@ -31,11 +34,13 @@ import java.util.List;
 @RequiredArgsConstructor
 @Getter
 @Setter
-public final class MySQLPreparedStatement {
+public final class MySQLPreparedStatement implements PreparedStatement {
     
     private final String sql;
     
     private final SQLStatement sqlStatement;
     
+    private final SQLStatementContext<?> sqlStatementContext;
+    
     private List<MySQLPreparedStatementParameterType> parameterTypes = Collections.emptyList();
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
new file mode 100644
index 00000000000..d43619c7a99
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Statement ID generator for MySQL.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class MySQLStatementIDGenerator {
+    
+    private static final MySQLStatementIDGenerator INSTANCE = new MySQLStatementIDGenerator();
+    
+    private final Map<Integer, AtomicInteger> connectionRegistry = new ConcurrentHashMap<>();
+    
+    /**
+     * Get prepared statement registry instance.
+     *
+     * @return prepared statement registry instance
+     */
+    public static MySQLStatementIDGenerator getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void registerConnection(final int connectionId) {
+        connectionRegistry.put(connectionId, new AtomicInteger());
+    }
+    
+    /**
+     * Generate next statement ID for connection.
+     *
+     * @param connectionId connection ID
+     * @return generated statement ID for prepared statement
+     */
+    public int nextStatementId(final int connectionId) {
+        return connectionRegistry.get(connectionId).incrementAndGet();
+    }
+    
+    /**
+     * Unregister connection.
+     *
+     * @param connectionId connection ID
+     */
+    public void unregisterConnection(final int connectionId) {
+        connectionRegistry.remove(connectionId);
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
index 9bf33030490..df9bb3962a4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
@@ -18,10 +18,9 @@
 package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry.MySQLConnectionPreparedStatements;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 
 import java.util.Collection;
@@ -35,18 +34,11 @@ public final class MySQLComStmtCloseExecutor implements CommandExecutor {
     
     private final MySQLComStmtClosePacket packet;
     
-    private final int connectionId;
+    private final ConnectionSession connectionSession;
     
     @Override
     public Collection<DatabasePacket<?>> execute() {
-        MySQLConnectionPreparedStatements connectionPreparedStatements = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionId);
-        if (!connectionReleased(connectionPreparedStatements)) {
-            connectionPreparedStatements.closeStatement(packet.getStatementId());
-        }
+        connectionSession.getPreparedStatementRegistry().removePreparedStatement(packet.getStatementId());
         return Collections.emptyList();
     }
-    
-    private boolean connectionReleased(final MySQLConnectionPreparedStatements connectionPreparedStatements) {
-        return null == connectionPreparedStatements;
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index bd867db3ebe..5aaf00ef658 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,12 +25,10 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -53,6 +51,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFa
 import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
@@ -88,19 +87,21 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
     @Override
     public Collection<DatabasePacket<?>> execute() throws SQLException {
         MySQLPreparedStatement preparedStatement = updateAndGetPreparedStatement();
-        String databaseName = connectionSession.getDatabaseName();
-        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLStatement sqlStatement = preparedStatement.getSqlStatement();
         if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
             connectionSession.getBackendConnection().handleAutoCommit();
         }
         List<Object> parameters = packet.readParameters(preparedStatement.getParameterTypes());
-        SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(), parameters,
-                sqlStatement, connectionSession.getDefaultDatabaseName());
+        SQLStatementContext<?> sqlStatementContext = preparedStatement.getSqlStatementContext();
+        if (sqlStatementContext instanceof ParameterAware) {
+            ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+        }
         // TODO optimize SQLStatementDatabaseHolder
         if (sqlStatementContext instanceof TableAvailable) {
             ((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
         }
+        String databaseName = connectionSession.getDatabaseName();
+        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLCheckEngine.check(sqlStatement, Collections.emptyList(), getRules(databaseName), databaseName, metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
         // TODO Refactor the following branch
         if (sqlStatement instanceof TCLStatement) {
@@ -116,7 +117,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
     }
     
     private MySQLPreparedStatement updateAndGetPreparedStatement() {
-        MySQLPreparedStatement result = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+        MySQLPreparedStatement result = connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
         if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == packet.getNewParametersBoundFlag()) {
             result.setParameterTypes(packet.getNewParameterTypes());
         }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index 15cf078b362..99c39c12287 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -22,12 +22,13 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
 import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -38,6 +39,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
 import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 
@@ -70,7 +72,10 @@ public final class MySQLComStmtPrepareExecutor implements CommandExecutor {
             throw new UnsupportedPreparedStatementException();
         }
         int projectionCount = getProjectionCount(sqlStatement);
-        int statementId = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).prepareStatement(packet.getSql(), sqlStatement);
+        int statementId = MySQLStatementIDGenerator.getInstance().nextStatementId(connectionSession.getConnectionId());
+        SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabases(),
+                sqlStatement, connectionSession.getDefaultDatabaseName());
+        connectionSession.getPreparedStatementRegistry().addPreparedStatement(statementId, new MySQLPreparedStatement(packet.getSql(), sqlStatement, sqlStatementContext));
         return createPackets(statementId, projectionCount, sqlStatement.getParameterCount());
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index 99ef170e35f..276eb511ff9 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -32,6 +31,9 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
 import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,35 +51,36 @@ import static org.mockito.Mockito.when;
 @RunWith(MockitoJUnitRunner.class)
 public final class MySQLCommandPacketFactoryTest {
     
-    private static final int CONNECTION_ID = 1;
-    
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private MySQLPacketPayload payload;
     
+    @Mock
+    private ConnectionSession connectionSession;
+    
     @Test
     public void assertNewInstanceWithComQuitPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT, payload, CONNECTION_ID), instanceOf(MySQLComQuitPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT, payload, connectionSession), instanceOf(MySQLComQuitPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComInitDbPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB, payload, CONNECTION_ID), instanceOf(MySQLComInitDbPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB, payload, connectionSession), instanceOf(MySQLComInitDbPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComFieldListPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST, payload, CONNECTION_ID), instanceOf(MySQLComFieldListPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST, payload, connectionSession), instanceOf(MySQLComFieldListPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComQueryPacket() throws SQLException {
         when(payload.readStringEOF()).thenReturn("SHOW TABLES");
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY, payload, CONNECTION_ID), instanceOf(MySQLComQueryPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY, payload, connectionSession), instanceOf(MySQLComQueryPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtPreparePacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE, payload, CONNECTION_ID), instanceOf(MySQLComStmtPreparePacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE, payload, connectionSession), instanceOf(MySQLComStmtPreparePacket.class));
     }
     
     @Test
@@ -85,139 +88,139 @@ public final class MySQLCommandPacketFactoryTest {
         when(payload.readInt1()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue());
         when(payload.readInt4()).thenReturn(1);
         when(payload.getByteBuf().getIntLE(anyInt())).thenReturn(1);
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement("SELECT * FROM t_order", new MySQLSelectStatement());
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE, payload, CONNECTION_ID), instanceOf(MySQLComStmtExecutePacket.class));
-        MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
+        PreparedStatementRegistry preparedStatementRegistry = new PreparedStatementRegistry();
+        when(connectionSession.getPreparedStatementRegistry()).thenReturn(preparedStatementRegistry);
+        preparedStatementRegistry.addPreparedStatement(1, new MySQLPreparedStatement("select 1", new MySQLSelectStatement(), null));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE, payload, connectionSession), instanceOf(MySQLComStmtExecutePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtClosePacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE, payload, CONNECTION_ID), instanceOf(MySQLComStmtClosePacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE, payload, connectionSession), instanceOf(MySQLComStmtClosePacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComPingPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING, payload, CONNECTION_ID), instanceOf(MySQLComPingPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING, payload, connectionSession), instanceOf(MySQLComPingPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSleepPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComCreateDbPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDropDbPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRefreshPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComShutDownPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStatisticsPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessInfoPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComProcessKillPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDebugPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTimePacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDelayedInsertPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComChangeUserPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComTableDumpPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComConnectOutPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComRegisterSlavePacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtSendLongDataPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtResetPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET, payload, CONNECTION_ID), instanceOf(MySQLComStmtResetPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET, payload, connectionSession), instanceOf(MySQLComStmtResetPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComSetOptionPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION, payload, CONNECTION_ID), instanceOf(MySQLComSetOptionPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION, payload, connectionSession), instanceOf(MySQLComSetOptionPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComStmtFetchPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComDaemonPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
     
     @Test
     public void assertNewInstanceWithComResetConnectionPacket() throws SQLException {
-        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+        assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
new file mode 100644
index 00000000000..28f52a8d966
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLStatementIDGeneratorTest {
+    
+    private static final int CONNECTION_ID = 1;
+    
+    @Before
+    public void setup() {
+        MySQLStatementIDGenerator.getInstance().registerConnection(CONNECTION_ID);
+    }
+    
+    @Test
+    public void assertNextStatementId() {
+        assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID), is(1));
+        assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID), is(2));
+    }
+    
+    @After
+    public void tearDown() {
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(CONNECTION_ID);
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
new file mode 100644
index 00000000000..a1c2da2706e
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
+
+import io.netty.buffer.Unpooled;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public final class MySQLComStmtCloseExecutorTest {
+    
+    @Test
+    public void assertExecute() {
+        MySQLComStmtClosePacket packet = new MySQLComStmtClosePacket(new MySQLPacketPayload(Unpooled.wrappedBuffer(new byte[]{0x01, 0x00, 0x00, 0x00}), StandardCharsets.UTF_8));
+        ConnectionSession connectionSession = mock(ConnectionSession.class, RETURNS_DEEP_STUBS);
+        assertThat(new MySQLComStmtCloseExecutor(packet, connectionSession).execute(), is(Collections.emptyList()));
+        verify(connectionSession.getPreparedStatementRegistry()).removePreparedStatement(1);
+    }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index eb875fe6104..c60ea2db6b1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -19,14 +19,19 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.exec
 
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
+import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
@@ -40,6 +45,8 @@ import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicati
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -48,6 +55,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.ColumnAssignmentSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
@@ -66,8 +74,10 @@ import org.mockito.MockedStatic;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Supplier;
 
@@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -82,6 +93,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -106,15 +118,17 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
         ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
         when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
         ProxyContext.init(contextManager);
-        when(connectionSession.getConnectionId()).thenReturn(1);
         when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_GENERAL_CI);
         when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
         when(connectionSession.getDatabaseName()).thenReturn("logic_db");
-        when(connectionSession.getDefaultDatabaseName()).thenReturn("logic_db");
-        MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("select * from tbl where id = ?", prepareSelectStatement());
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("update tbl set col=1 where id = ?", prepareUpdateStatement());
-        MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("commit", new MySQLCommitStatement());
+        SQLStatementContext<?> selectStatementContext = prepareSelectStatementContext();
+        when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(1))
+                .thenReturn(new MySQLPreparedStatement("select * from tbl where id = ?", prepareSelectStatement(), selectStatementContext));
+        UpdateStatementContext updateStatementContext = mock(UpdateStatementContext.class, RETURNS_DEEP_STUBS);
+        when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(2))
+                .thenReturn(new MySQLPreparedStatement("update tbl set col=1 where id = ?", prepareUpdateStatement(), updateStatementContext));
+        when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(3))
+                .thenReturn(new MySQLPreparedStatement("commit", new MySQLCommitStatement(), new CommonSQLStatementContext<>(new MySQLCommitStatement())));
     }
     
     private ShardingSphereDatabase mockDatabase() {
@@ -131,6 +145,12 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
         return sqlStatement;
     }
     
+    private SQLStatementContext<?> prepareSelectStatementContext() {
+        SelectStatementContext result = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
+        when(result.getTablesContext().getDatabaseName()).thenReturn(Optional.empty());
+        return result;
+    }
+    
     private MySQLUpdateStatement prepareUpdateStatement() {
         MySQLUpdateStatement result = new MySQLUpdateStatement();
         ColumnSegment columnSegment = new ColumnSegment(0, 0, new IdentifierValue("col"));
@@ -145,6 +165,8 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
         when(packet.getStatementId()).thenReturn(1);
         MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(packet, connectionSession);
         when(databaseCommunicationEngine.execute()).thenReturn(new QueryResponseHeader(Collections.singletonList(mock(QueryHeader.class))));
+        when(databaseCommunicationEngine.next()).thenReturn(true, false);
+        when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new QueryResponseRow(Collections.singletonList(new BinaryQueryResponseCell(Types.INTEGER, 1))));
         Iterator<DatabasePacket<?>> actual;
         try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic = mockStatic(DatabaseCommunicationEngineFactory.class, RETURNS_DEEP_STUBS)) {
             mockedStatic.when(() -> DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(any(SQLStatementContext.class), anyString(), anyList(), eq(backendConnection)))
@@ -156,12 +178,19 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
         assertThat(actual.next(), instanceOf(MySQLColumnDefinition41Packet.class));
         assertThat(actual.next(), instanceOf(MySQLEofPacket.class));
         assertFalse(actual.hasNext());
+        assertTrue(mysqlComStmtExecuteExecutor.next());
+        MySQLPacket actualQueryRowPacket = mysqlComStmtExecuteExecutor.getQueryRowPacket();
+        assertThat(actualQueryRowPacket, instanceOf(MySQLBinaryResultSetRowPacket.class));
+        assertThat(actualQueryRowPacket.getSequenceId(), is(4));
+        mysqlComStmtExecuteExecutor.close();
+        verify(databaseCommunicationEngine).close();
     }
     
     @Test
     public void assertIsUpdateResponse() throws SQLException {
         MySQLComStmtExecutePacket packet = mock(MySQLComStmtExecutePacket.class);
         when(packet.getStatementId()).thenReturn(2);
+        when(packet.getNewParametersBoundFlag()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST);
         MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(packet, connectionSession);
         when(databaseCommunicationEngine.execute()).thenReturn(new UpdateResponseHeader(new MySQLUpdateStatement()));
         Iterator<DatabasePacket<?>> actual;
@@ -191,5 +220,7 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
         assertThat(mysqlComStmtExecuteExecutor.getResponseType(), is(ResponseType.UPDATE));
         assertThat(actual.next(), instanceOf(MySQLOKPacket.class));
         assertFalse(actual.hasNext());
+        mysqlComStmtExecuteExecutor.close();
+        verify(textProtocolBackendHandler).close();
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
index 10f4ca924d2..ab5ecebff61 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
@@ -19,9 +19,30 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prep
 
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
 import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
+import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
@@ -29,11 +50,18 @@ import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
+import java.util.Iterator;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class MySQLComStmtPrepareExecutorTest {
+public final class MySQLComStmtPrepareExecutorTest extends ProxyContextRestorer {
     
     @Mock
     private MySQLComStmtPreparePacket packet;
@@ -41,12 +69,73 @@ public final class MySQLComStmtPrepareExecutorTest {
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private ConnectionSession connectionSession;
     
+    @Before
+    public void setup() {
+        ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+        prepareSQLParser();
+        when(connectionSession.getPreparedStatementRegistry()).thenReturn(new PreparedStatementRegistry());
+        when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
+    }
+    
+    private void prepareSQLParser() {
+        ContextManager contextManager = ProxyContext.getInstance().getContextManager();
+        MetaDataContexts metaDataContexts = contextManager.getMetaDataContexts();
+        when(metaDataContexts.getMetaData().getGlobalRuleMetaData()).thenReturn(mock(ShardingSphereRuleMetaData.class));
+        CacheOption cacheOption = new CacheOption(1024, 1024);
+        when(metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class))
+                .thenReturn(new SQLParserRule(new SQLParserRuleConfiguration(false, cacheOption, cacheOption)));
+        when(metaDataContexts.getMetaData().getDatabases().get(connectionSession.getDatabaseName()).getProtocolType()).thenReturn(new MySQLDatabaseType());
+    }
+    
     @Test(expected = UnsupportedPreparedStatementException.class)
     public void assertPrepareMultiStatements() throws SQLException {
         when(packet.getSql()).thenReturn("update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3");
-        when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
         when(connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)).thenReturn(true);
         when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS).get()).thenReturn(0);
         new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
     }
+    
+    @Test
+    public void assertPrepareSelectStatement() throws SQLException {
+        String sql = "select name from t where id = ?";
+        when(packet.getSql()).thenReturn(sql);
+        when(connectionSession.getConnectionId()).thenReturn(1);
+        MySQLStatementIDGenerator.getInstance().registerConnection(1);
+        Iterator<DatabasePacket<?>> actualIterator = new MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+        assertThat(actualIterator.next(), instanceOf(MySQLComStmtPrepareOKPacket.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertFalse(actualIterator.hasNext());
+        MySQLPreparedStatement actualPreparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+        assertThat(actualPreparedStatement.getSql(), is(sql));
+        assertThat(actualPreparedStatement.getSqlStatement(), instanceOf(MySQLSelectStatement.class));
+        assertThat(actualPreparedStatement.getSqlStatementContext(), instanceOf(SelectStatementContext.class));
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+    }
+    
+    @Test
+    public void assertPrepareUpdateStatement() throws SQLException {
+        String sql = "update t set v = ?";
+        when(packet.getSql()).thenReturn(sql);
+        when(connectionSession.getConnectionId()).thenReturn(1);
+        MySQLStatementIDGenerator.getInstance().registerConnection(1);
+        Iterator<DatabasePacket<?>> actualIterator = new MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+        assertThat(actualIterator.next(), instanceOf(MySQLComStmtPrepareOKPacket.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+        assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+        assertFalse(actualIterator.hasNext());
+        MySQLPreparedStatement actualPreparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+        assertThat(actualPreparedStatement.getSql(), is(sql));
+        assertThat(actualPreparedStatement.getSqlStatement(), instanceOf(MySQLUpdateStatement.class));
+        assertThat(actualPreparedStatement.getSqlStatementContext(), instanceOf(UpdateStatementContext.class));
+        MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+    }
+    
+    @Test(expected = UnsupportedPreparedStatementException.class)
+    public void assertPrepareNotAllowedStatement() throws SQLException {
+        when(packet.getSql()).thenReturn("begin");
+        new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index 3052f9847ee..48880226557 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -27,13 +27,11 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
 import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
 import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
 import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.binder.type.TableAvailable;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -54,6 +52,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
 import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
 import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
 import org.apache.shardingsphere.proxy.frontend.reactive.command.executor.ReactiveCommandExecutor;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -90,16 +89,18 @@ public final class ReactiveMySQLComStmtExecuteExecutor implements ReactiveComman
     @Override
     public Future<Collection<DatabasePacket<?>>> executeFuture() {
         MySQLPreparedStatement preparedStatement = updateAndGetPreparedStatement();
-        String databaseName = connectionSession.getDatabaseName();
-        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
-        SQLStatement sqlStatement = preparedStatement.getSqlStatement();
         List<Object> parameters = packet.readParameters(preparedStatement.getParameterTypes());
-        SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(), parameters,
-                sqlStatement, connectionSession.getDefaultDatabaseName());
+        SQLStatementContext<?> sqlStatementContext = preparedStatement.getSqlStatementContext();
+        if (sqlStatementContext instanceof ParameterAware) {
+            ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+        }
         // TODO optimize SQLStatementDatabaseHolder
         if (sqlStatementContext instanceof TableAvailable) {
             ((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
         }
+        SQLStatement sqlStatement = preparedStatement.getSqlStatement();
+        String databaseName = connectionSession.getDatabaseName();
+        MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
         SQLCheckEngine.check(sqlStatement, Collections.emptyList(), getRules(databaseName), databaseName, metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
         int characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
         // TODO Refactor the following branch
@@ -130,7 +131,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor implements ReactiveComman
     }
     
     private MySQLPreparedStatement updateAndGetPreparedStatement() {
-        MySQLPreparedStatement result = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+        MySQLPreparedStatement result = connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
         if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == packet.getNewParametersBoundFlag()) {
             result.setParameterTypes(packet.getNewParameterTypes());
         }