You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/06/23 12:42:04 UTC
[shardingsphere] branch master updated: Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f9fc2f352f5 Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
f9fc2f352f5 is described below
commit f9fc2f352f5806f2d553d512e2704dca6aa2e208
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Thu Jun 23 20:41:55 2022 +0800
Move MySQLPreparedStatementRegistry into ConnectionSession (#18545)
* Refactor PreparedStatement of MySQL Proxy
* Complete tests in db-protocol-mysql
* Complete MySQLCommandPacketFactoryTest
* Complete MySQLComStmtExecuteExecutorTest
* Add PreparedStatementRegistryTest
* Add MySQLComStmtCloseExecutorTest
* Complete MySQLComStmtPrepareExecutorTest
* Fix code style in MySQLComStmtPrepareExecutorTest
* Move MySQLStatementIDGenerator from protocol to frontend
---
.../binary/MySQLPreparedStatementRegistry.java | 114 ---------------------
.../binary/MySQLPreparedStatementRegistryTest.java | 78 --------------
.../execute/MySQLComStmtExecutePacketTest.java | 11 --
.../proxy/backend/session/ConnectionSession.java | 2 +
.../proxy/backend/session/PreparedStatement.java | 37 ++++---
.../backend/session/PreparedStatementRegistry.java | 60 +++++++++++
.../session/PreparedStatementRegistryTest.java | 57 +++++++++++
.../proxy/frontend/mysql/MySQLFrontendEngine.java | 4 +-
.../authentication/MySQLAuthenticationEngine.java | 4 +-
.../mysql/command/MySQLCommandExecuteEngine.java | 2 +-
.../mysql/command/MySQLCommandExecutorFactory.java | 2 +-
.../mysql/command/MySQLCommandPacketFactory.java | 11 +-
.../query/binary/MySQLPreparedStatement.java | 9 +-
.../query/binary/MySQLStatementIDGenerator.java | 73 +++++++++++++
.../binary/close/MySQLComStmtCloseExecutor.java | 14 +--
.../execute/MySQLComStmtExecuteExecutor.java | 17 +--
.../prepare/MySQLComStmtPrepareExecutor.java | 9 +-
.../command/MySQLCommandPacketFactoryTest.java | 79 +++++++-------
.../binary/MySQLStatementIDGeneratorTest.java | 46 +++++++++
.../close/MySQLComStmtCloseExecutorTest.java | 44 ++++++++
.../execute/MySQLComStmtExecuteExecutorTest.java | 45 ++++++--
.../prepare/MySQLComStmtPrepareExecutorTest.java | 93 ++++++++++++++++-
.../ReactiveMySQLComStmtExecuteExecutor.java | 19 ++--
23 files changed, 521 insertions(+), 309 deletions(-)
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
deleted file mode 100644
index 273e56dfccf..00000000000
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistry.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * MySQL prepared statement registry.
- */
-@NoArgsConstructor(access = AccessLevel.NONE)
-public final class MySQLPreparedStatementRegistry {
-
- private static final MySQLPreparedStatementRegistry INSTANCE = new MySQLPreparedStatementRegistry();
-
- private final ConcurrentMap<Integer, MySQLConnectionPreparedStatements> connectionRegistry = new ConcurrentHashMap<>(8192, 1);
-
- /**
- * Get prepared statement registry instance.
- *
- * @return prepared statement registry instance
- */
- public static MySQLPreparedStatementRegistry getInstance() {
- return INSTANCE;
- }
-
- /**
- * Register connection.
- *
- * @param connectionId connection ID
- */
- public void registerConnection(final int connectionId) {
- connectionRegistry.put(connectionId, new MySQLConnectionPreparedStatements());
- }
-
- /**
- * Get connection prepared statements.
- *
- * @param connectionId connection ID
- * @return MySQL connection prepared statements
- */
- public MySQLConnectionPreparedStatements getConnectionPreparedStatements(final int connectionId) {
- return connectionRegistry.get(connectionId);
- }
-
- /**
- * Unregister connection.
- *
- * @param connectionId connection ID
- */
- public void unregisterConnection(final int connectionId) {
- connectionRegistry.remove(connectionId);
- }
-
- public static class MySQLConnectionPreparedStatements {
-
- private final Map<Integer, MySQLPreparedStatement> preparedStatements = new ConcurrentHashMap<>(16384, 1);
-
- private final AtomicInteger sequence = new AtomicInteger();
-
- /**
- * Prepare statement.
- *
- * @param sql SQL
- * @param sqlStatement sql statement of prepared statement
- * @return statement ID
- */
- public int prepareStatement(final String sql, final SQLStatement sqlStatement) {
- int result = sequence.incrementAndGet();
- preparedStatements.put(result, new MySQLPreparedStatement(sql, sqlStatement));
- return result;
- }
-
- /**
- * Get prepared statement.
- *
- * @param statementId statement ID
- * @return prepared statement
- */
- public MySQLPreparedStatement get(final int statementId) {
- return preparedStatements.get(statementId);
- }
-
- /**
- * Close statement.
- *
- * @param statementId statement ID
- */
- public void closeStatement(final int statementId) {
- preparedStatements.remove(statementId);
- }
- }
-}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
deleted file mode 100644
index 3808fa3a86b..00000000000
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatementRegistryTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
-
-import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-public final class MySQLPreparedStatementRegistryTest {
-
- private static final int CONNECTION_ID = 1;
-
- private static final String SQL = "SELECT * FROM tbl WHERE id=?";
-
- @Before
- public void setup() {
- MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
- }
-
- @Test
- public void assertRegisterIfAbsent() {
- assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(1));
- MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
- assertThat(actual.getSql(), is(SQL));
- assertThat(actual.getSqlStatement().getParameterCount(), is(1));
- }
-
- @Test
- public void assertPrepareSameSQL() {
- assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(1));
- assertThat(MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement()), is(2));
- MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
- assertThat(actual.getSql(), is(SQL));
- assertThat(actual.getSqlStatement().getParameterCount(), is(1));
- actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
- assertThat(actual.getSql(), is(SQL));
- assertThat(actual.getSqlStatement().getParameterCount(), is(1));
- }
-
- @Test
- public void assertCloseStatement() {
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement(SQL, prepareSQLStatement());
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).closeStatement(1);
- MySQLPreparedStatement actual = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).get(1);
- assertNull(actual);
- }
-
- private MySQLSelectStatement prepareSQLStatement() {
- MySQLSelectStatement result = new MySQLSelectStatement();
- result.setParameterCount(1);
- return result;
- }
-
- @After
- public void tearDown() {
- MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
- }
-}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
index 2a57d47c9ae..bf6677eb23e 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/test/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/execute/MySQLComStmtExecutePacketTest.java
@@ -21,10 +21,7 @@ import io.netty.buffer.Unpooled;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
-import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
-import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
@@ -39,14 +36,6 @@ import static org.junit.Assert.assertTrue;
public final class MySQLComStmtExecutePacketTest {
- @Before
- public void setup() {
- MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
- MySQLSelectStatement sqlStatement = new MySQLSelectStatement();
- sqlStatement.setParameterCount(1);
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("SELECT id FROM tbl WHERE id=?", sqlStatement);
- }
-
@Test
public void assertNewWithoutParameter() throws SQLException {
byte[] data = {0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x00, 0x00, 0x00};
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
index c807e19b46d..75b0254e603 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java
@@ -78,6 +78,8 @@ public final class ConnectionSession {
private final Map<String, CursorStatementContext> cursorDefinitions = new ConcurrentHashMap<>();
+ private final PreparedStatementRegistry preparedStatementRegistry = new PreparedStatementRegistry();
+
public ConnectionSession(final DatabaseType databaseType, final TransactionType initialTransactionType, final AttributeMap attributeMap) {
this.databaseType = databaseType;
transactionStatus = new TransactionStatus(initialTransactionType);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
similarity index 57%
copy from shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
copy to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
index 1b356b54413..daeb62cc0f2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatement.java
@@ -15,27 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.backend.session;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import java.util.Collections;
-import java.util.List;
-
/**
- * Binary prepared statement for MySQL.
+ * Logic prepared statement for clients of ShardingSphere-Proxy.
*/
-@RequiredArgsConstructor
-@Getter
-@Setter
-public final class MySQLPreparedStatement {
+public interface PreparedStatement {
- private final String sql;
+ /**
+ * Get SQL of prepared statement.
+ *
+ * @return SQL
+ */
+ String getSql();
- private final SQLStatement sqlStatement;
+ /**
+ * Get {@link SQLStatement} of prepared statement.
+ *
+ * @return {@link SQLStatement}
+ */
+ SQLStatement getSqlStatement();
- private List<MySQLPreparedStatementParameterType> parameterTypes = Collections.emptyList();
+ /**
+ * Get {@link SQLStatementContext} of prepared statement.
+ *
+ * @return {@link SQLStatementContext}
+ */
+ SQLStatementContext<?> getSqlStatementContext();
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
new file mode 100644
index 00000000000..b474a5deaa3
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * {@link PreparedStatement} registry for {@link ConnectionSession}.
+ */
+public final class PreparedStatementRegistry {
+
+ private final Map<Object, PreparedStatement> preparedStatements = new ConcurrentHashMap<>();
+
+ /**
+ * Add {@link PreparedStatement} into registry.
+ *
+ * @param statementId statement ID
+ * @param preparedStatement prepared statement
+ */
+ public void addPreparedStatement(final Object statementId, final PreparedStatement preparedStatement) {
+ preparedStatements.put(statementId, preparedStatement);
+ }
+
+ /**
+ * Get prepared statement by statement ID.
+ *
+ * @param <T> implementation of {@link PreparedStatement}
+ * @param statementId statement ID
+ * @return {@link PreparedStatement}
+ */
+ @SuppressWarnings("unchecked")
+ public <T extends PreparedStatement> T getPreparedStatement(final Object statementId) {
+ return (T) preparedStatements.get(statementId);
+ }
+
+ /**
+ * Remove {@link PreparedStatement} from registry.
+ *
+ * @param statementId statement ID
+ */
+ public void removePreparedStatement(final Object statementId) {
+ preparedStatements.remove(statementId);
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
new file mode 100644
index 00000000000..6ad372c6973
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/PreparedStatementRegistryTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.session;
+
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class PreparedStatementRegistryTest {
+
+ @Test
+ public void assertAddAndGetAndClosePreparedStatement() {
+ PreparedStatement expected = new DummyPreparedStatement();
+ PreparedStatementRegistry registry = new PreparedStatementRegistry();
+ registry.addPreparedStatement(1, expected);
+ assertThat(registry.getPreparedStatement(1), is(expected));
+ registry.removePreparedStatement(1);
+ assertNull(registry.getPreparedStatement(1));
+ }
+
+ private static class DummyPreparedStatement implements PreparedStatement {
+
+ @Override
+ public String getSql() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SQLStatement getSqlStatement() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SQLStatementContext<?> getSqlStatementContext() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
index 6df5b49db04..92ed5880826 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
import org.apache.shardingsphere.db.protocol.mysql.codec.MySQLPacketCodecEngine;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
@@ -59,7 +59,7 @@ public final class MySQLFrontendEngine implements DatabaseProtocolFrontendEngine
@Override
public void release(final ConnectionSession connectionSession) {
- MySQLPreparedStatementRegistry.getInstance().unregisterConnection(connectionSession.getConnectionId());
+ MySQLStatementIDGenerator.getInstance().unregisterConnection(connectionSession.getConnectionId());
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
index 0ec0494b7d2..8a5a0fcd738 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/authentication/MySQLAuthenticationEngine.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConnectionPhase
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerErrorCode;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLStatusFlag;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthSwitchRequestPacket;
@@ -68,7 +68,7 @@ public final class MySQLAuthenticationEngine implements AuthenticationEngine {
int result = ConnectionIdGenerator.getInstance().nextId();
connectionPhase = MySQLConnectionPhase.AUTH_PHASE_FAST_PATH;
context.writeAndFlush(new MySQLHandshakePacket(result, authenticationHandler.getAuthPluginData()));
- MySQLPreparedStatementRegistry.getInstance().registerConnection(result);
+ MySQLStatementIDGenerator.getInstance().registerConnection(result);
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 5721171c8a5..9567549c33b 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -53,7 +53,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
@Override
public MySQLCommandPacket getCommandPacket(final PacketPayload payload, final CommandPacketType type, final ConnectionSession connectionSession) throws SQLException {
- return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload, connectionSession.getConnectionId());
+ return MySQLCommandPacketFactory.newInstance((MySQLCommandPacketType) type, (MySQLPacketPayload) payload, connectionSession);
}
@Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
index 70e296d8a0c..2eed6d6fb6a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecutorFactory.java
@@ -82,7 +82,7 @@ public final class MySQLCommandExecutorFactory {
case COM_STMT_RESET:
return new MySQLComStmtResetExecutor((MySQLComStmtResetPacket) commandPacket, connectionSession);
case COM_STMT_CLOSE:
- return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) commandPacket, connectionSession.getConnectionId());
+ return new MySQLComStmtCloseExecutor((MySQLComStmtClosePacket) commandPacket, connectionSession);
case COM_SET_OPTION:
return new MySQLComSetOptionExecutor((MySQLComSetOptionPacket) commandPacket, connectionSession);
default:
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
index eec4205d8f4..ed514ac4224 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactory.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -35,6 +33,8 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import java.sql.SQLException;
@@ -49,11 +49,11 @@ public final class MySQLCommandPacketFactory {
*
* @param commandPacketType command packet type for MySQL
* @param payload packet payload for MySQL
- * @param connectionId connection ID
+ * @param connectionSession connection session
* @return created instance
* @throws SQLException SQL exception
*/
- public static MySQLCommandPacket newInstance(final MySQLCommandPacketType commandPacketType, final MySQLPacketPayload payload, final int connectionId) throws SQLException {
+ public static MySQLCommandPacket newInstance(final MySQLCommandPacketType commandPacketType, final MySQLPacketPayload payload, final ConnectionSession connectionSession) throws SQLException {
switch (commandPacketType) {
case COM_QUIT:
return new MySQLComQuitPacket();
@@ -66,8 +66,7 @@ public final class MySQLCommandPacketFactory {
case COM_STMT_PREPARE:
return new MySQLComStmtPreparePacket(payload);
case COM_STMT_EXECUTE:
- MySQLPreparedStatement preparedStatement = MySQLPreparedStatementRegistry.getInstance()
- .getConnectionPreparedStatements(connectionId).get(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
+ MySQLPreparedStatement preparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(payload.getByteBuf().getIntLE(payload.getByteBuf().readerIndex()));
return new MySQLComStmtExecutePacket(payload, preparedStatement.getSqlStatement().getParameterCount());
case COM_STMT_RESET:
return new MySQLComStmtResetPacket(payload);
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
similarity index 72%
rename from shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
rename to shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
index 1b356b54413..9588539f3a2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/command/query/binary/MySQLPreparedStatement.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLPreparedStatement.java
@@ -15,11 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary;
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementParameterType;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.util.Collections;
@@ -31,11 +34,13 @@ import java.util.List;
@RequiredArgsConstructor
@Getter
@Setter
-public final class MySQLPreparedStatement {
+public final class MySQLPreparedStatement implements PreparedStatement {
private final String sql;
private final SQLStatement sqlStatement;
+ private final SQLStatementContext<?> sqlStatementContext;
+
private List<MySQLPreparedStatementParameterType> parameterTypes = Collections.emptyList();
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
new file mode 100644
index 00000000000..d43619c7a99
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGenerator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Statement ID generator for MySQL.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class MySQLStatementIDGenerator {
+
+ private static final MySQLStatementIDGenerator INSTANCE = new MySQLStatementIDGenerator();
+
+ private final Map<Integer, AtomicInteger> connectionRegistry = new ConcurrentHashMap<>();
+
+ /**
+ * Get prepared statement registry instance.
+ *
+ * @return prepared statement registry instance
+ */
+ public static MySQLStatementIDGenerator getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Register connection.
+ *
+ * @param connectionId connection ID
+ */
+ public void registerConnection(final int connectionId) {
+ connectionRegistry.put(connectionId, new AtomicInteger());
+ }
+
+ /**
+ * Generate next statement ID for connection.
+ *
+ * @param connectionId connection ID
+ * @return generated statement ID for prepared statement
+ */
+ public int nextStatementId(final int connectionId) {
+ return connectionRegistry.get(connectionId).incrementAndGet();
+ }
+
+ /**
+ * Unregister connection.
+ *
+ * @param connectionId connection ID
+ */
+ public void unregisterConnection(final int connectionId) {
+ connectionRegistry.remove(connectionId);
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
index 9bf33030490..df9bb3962a4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutor.java
@@ -18,10 +18,9 @@
package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry.MySQLConnectionPreparedStatements;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import java.util.Collection;
@@ -35,18 +34,11 @@ public final class MySQLComStmtCloseExecutor implements CommandExecutor {
private final MySQLComStmtClosePacket packet;
- private final int connectionId;
+ private final ConnectionSession connectionSession;
@Override
public Collection<DatabasePacket<?>> execute() {
- MySQLConnectionPreparedStatements connectionPreparedStatements = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionId);
- if (!connectionReleased(connectionPreparedStatements)) {
- connectionPreparedStatements.closeStatement(packet.getStatementId());
- }
+ connectionSession.getPreparedStatementRegistry().removePreparedStatement(packet.getStatementId());
return Collections.emptyList();
}
-
- private boolean connectionReleased(final MySQLConnectionPreparedStatements connectionPreparedStatements) {
- return null == connectionPreparedStatements;
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
index bd867db3ebe..5aaf00ef658 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutor.java
@@ -25,12 +25,10 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -53,6 +51,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFa
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
@@ -88,19 +87,21 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
MySQLPreparedStatement preparedStatement = updateAndGetPreparedStatement();
- String databaseName = connectionSession.getDatabaseName();
- MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
SQLStatement sqlStatement = preparedStatement.getSqlStatement();
if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connectionSession.getBackendConnection().handleAutoCommit();
}
List<Object> parameters = packet.readParameters(preparedStatement.getParameterTypes());
- SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(), parameters,
- sqlStatement, connectionSession.getDefaultDatabaseName());
+ SQLStatementContext<?> sqlStatementContext = preparedStatement.getSqlStatementContext();
+ if (sqlStatementContext instanceof ParameterAware) {
+ ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+ }
// TODO optimize SQLStatementDatabaseHolder
if (sqlStatementContext instanceof TableAvailable) {
((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
}
+ String databaseName = connectionSession.getDatabaseName();
+ MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
SQLCheckEngine.check(sqlStatement, Collections.emptyList(), getRules(databaseName), databaseName, metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
// TODO Refactor the following branch
if (sqlStatement instanceof TCLStatement) {
@@ -116,7 +117,7 @@ public final class MySQLComStmtExecuteExecutor implements QueryCommandExecutor {
}
private MySQLPreparedStatement updateAndGetPreparedStatement() {
- MySQLPreparedStatement result = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+ MySQLPreparedStatement result = connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == packet.getNewParametersBoundFlag()) {
result.setParameterTypes(packet.getNewParameterTypes());
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
index 15cf078b362..99c39c12287 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutor.java
@@ -22,12 +22,13 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLComSetOptionPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -38,6 +39,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
@@ -70,7 +72,10 @@ public final class MySQLComStmtPrepareExecutor implements CommandExecutor {
throw new UnsupportedPreparedStatementException();
}
int projectionCount = getProjectionCount(sqlStatement);
- int statementId = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).prepareStatement(packet.getSql(), sqlStatement);
+ int statementId = MySQLStatementIDGenerator.getInstance().nextStatementId(connectionSession.getConnectionId());
+ SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getDatabases(),
+ sqlStatement, connectionSession.getDefaultDatabaseName());
+ connectionSession.getPreparedStatementRegistry().addPreparedStatement(statementId, new MySQLPreparedStatement(packet.getSql(), sqlStatement, sqlStatementContext));
return createPackets(statementId, projectionCount, sqlStatement.getParameterCount());
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
index 99ef170e35f..276eb511ff9 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandPacketFactoryTest.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.MySQLUns
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.initdb.MySQLComInitDbPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.ping.MySQLComPingPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.admin.quit.MySQLComQuitPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
@@ -32,6 +31,9 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.r
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.fieldlist.MySQLComFieldListPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,35 +51,36 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class MySQLCommandPacketFactoryTest {
- private static final int CONNECTION_ID = 1;
-
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private MySQLPacketPayload payload;
+ @Mock
+ private ConnectionSession connectionSession;
+
@Test
public void assertNewInstanceWithComQuitPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT, payload, CONNECTION_ID), instanceOf(MySQLComQuitPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUIT, payload, connectionSession), instanceOf(MySQLComQuitPacket.class));
}
@Test
public void assertNewInstanceWithComInitDbPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB, payload, CONNECTION_ID), instanceOf(MySQLComInitDbPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_INIT_DB, payload, connectionSession), instanceOf(MySQLComInitDbPacket.class));
}
@Test
public void assertNewInstanceWithComFieldListPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST, payload, CONNECTION_ID), instanceOf(MySQLComFieldListPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_FIELD_LIST, payload, connectionSession), instanceOf(MySQLComFieldListPacket.class));
}
@Test
public void assertNewInstanceWithComQueryPacket() throws SQLException {
when(payload.readStringEOF()).thenReturn("SHOW TABLES");
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY, payload, CONNECTION_ID), instanceOf(MySQLComQueryPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_QUERY, payload, connectionSession), instanceOf(MySQLComQueryPacket.class));
}
@Test
public void assertNewInstanceWithComStmtPreparePacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE, payload, CONNECTION_ID), instanceOf(MySQLComStmtPreparePacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_PREPARE, payload, connectionSession), instanceOf(MySQLComStmtPreparePacket.class));
}
@Test
@@ -85,139 +88,139 @@ public final class MySQLCommandPacketFactoryTest {
when(payload.readInt1()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST.getValue());
when(payload.readInt4()).thenReturn(1);
when(payload.getByteBuf().getIntLE(anyInt())).thenReturn(1);
- MySQLPreparedStatementRegistry.getInstance().registerConnection(CONNECTION_ID);
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(CONNECTION_ID).prepareStatement("SELECT * FROM t_order", new MySQLSelectStatement());
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE, payload, CONNECTION_ID), instanceOf(MySQLComStmtExecutePacket.class));
- MySQLPreparedStatementRegistry.getInstance().unregisterConnection(CONNECTION_ID);
+ PreparedStatementRegistry preparedStatementRegistry = new PreparedStatementRegistry();
+ when(connectionSession.getPreparedStatementRegistry()).thenReturn(preparedStatementRegistry);
+ preparedStatementRegistry.addPreparedStatement(1, new MySQLPreparedStatement("select 1", new MySQLSelectStatement(), null));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_EXECUTE, payload, connectionSession), instanceOf(MySQLComStmtExecutePacket.class));
}
@Test
public void assertNewInstanceWithComStmtClosePacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE, payload, CONNECTION_ID), instanceOf(MySQLComStmtClosePacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_CLOSE, payload, connectionSession), instanceOf(MySQLComStmtClosePacket.class));
}
@Test
public void assertNewInstanceWithComPingPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING, payload, CONNECTION_ID), instanceOf(MySQLComPingPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PING, payload, connectionSession), instanceOf(MySQLComPingPacket.class));
}
@Test
public void assertNewInstanceWithComSleepPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SLEEP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComCreateDbPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CREATE_DB, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComDropDbPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DROP_DB, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComRefreshPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REFRESH, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComShutDownPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SHUTDOWN, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComStatisticsPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STATISTICS, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComProcessInfoPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_INFO, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComConnectPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComProcessKillPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_PROCESS_KILL, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComDebugPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DEBUG, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComTimePacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TIME, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComDelayedInsertPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DELAYED_INSERT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComChangeUserPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CHANGE_USER, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComBinlogDumpPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComTableDumpPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_TABLE_DUMP, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComConnectOutPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_CONNECT_OUT, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComRegisterSlavePacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_REGISTER_SLAVE, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComStmtSendLongDataPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_SEND_LONG_DATA, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComStmtResetPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET, payload, CONNECTION_ID), instanceOf(MySQLComStmtResetPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_RESET, payload, connectionSession), instanceOf(MySQLComStmtResetPacket.class));
}
@Test
public void assertNewInstanceWithComSetOptionPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION, payload, CONNECTION_ID), instanceOf(MySQLComSetOptionPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_SET_OPTION, payload, connectionSession), instanceOf(MySQLComSetOptionPacket.class));
}
@Test
public void assertNewInstanceWithComStmtFetchPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_STMT_FETCH, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComDaemonPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_DAEMON, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComBinlogDumpGTIDPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_BINLOG_DUMP_GTID, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
@Test
public void assertNewInstanceWithComResetConnectionPacket() throws SQLException {
- assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION, payload, CONNECTION_ID), instanceOf(MySQLUnsupportedCommandPacket.class));
+ assertThat(MySQLCommandPacketFactory.newInstance(MySQLCommandPacketType.COM_RESET_CONNECTION, payload, connectionSession), instanceOf(MySQLUnsupportedCommandPacket.class));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
new file mode 100644
index 00000000000..28f52a8d966
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/MySQLStatementIDGeneratorTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class MySQLStatementIDGeneratorTest {
+
+ private static final int CONNECTION_ID = 1;
+
+ @Before
+ public void setup() {
+ MySQLStatementIDGenerator.getInstance().registerConnection(CONNECTION_ID);
+ }
+
+ @Test
+ public void assertNextStatementId() {
+ assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID), is(1));
+ assertThat(MySQLStatementIDGenerator.getInstance().nextStatementId(CONNECTION_ID), is(2));
+ }
+
+ @After
+ public void tearDown() {
+ MySQLStatementIDGenerator.getInstance().unregisterConnection(CONNECTION_ID);
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
new file mode 100644
index 00000000000..a1c2da2706e
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/close/MySQLComStmtCloseExecutorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.close;
+
+import io.netty.buffer.Unpooled;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.close.MySQLComStmtClosePacket;
+import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public final class MySQLComStmtCloseExecutorTest {
+
+ @Test
+ public void assertExecute() {
+ MySQLComStmtClosePacket packet = new MySQLComStmtClosePacket(new MySQLPacketPayload(Unpooled.wrappedBuffer(new byte[]{0x01, 0x00, 0x00, 0x00}), StandardCharsets.UTF_8));
+ ConnectionSession connectionSession = mock(ConnectionSession.class, RETURNS_DEEP_STUBS);
+ assertThat(new MySQLComStmtCloseExecutor(packet, connectionSession).execute(), is(Collections.emptyList()));
+ verify(connectionSession.getPreparedStatementRegistry()).removePreparedStatement(1);
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
index eb875fe6104..c60ea2db6b1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/execute/MySQLComStmtExecuteExecutorTest.java
@@ -19,14 +19,19 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.exec
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
+import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLFieldCountPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext;
@@ -40,6 +45,8 @@ import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicati
import org.apache.shardingsphere.proxy.backend.communication.jdbc.JDBCDatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
+import org.apache.shardingsphere.proxy.backend.response.data.impl.BinaryQueryResponseCell;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryHeader;
import org.apache.shardingsphere.proxy.backend.response.header.query.QueryResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -48,6 +55,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.ColumnAssignmentSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.assignment.SetAssignmentSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
@@ -66,8 +74,10 @@ import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
@@ -75,6 +85,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
@@ -82,6 +93,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -106,15 +118,17 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
ProxyContext.init(contextManager);
- when(connectionSession.getConnectionId()).thenReturn(1);
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_GENERAL_CI);
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
when(connectionSession.getDatabaseName()).thenReturn("logic_db");
- when(connectionSession.getDefaultDatabaseName()).thenReturn("logic_db");
- MySQLPreparedStatementRegistry.getInstance().registerConnection(1);
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("select * from tbl where id = ?", prepareSelectStatement());
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("update tbl set col=1 where id = ?", prepareUpdateStatement());
- MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(1).prepareStatement("commit", new MySQLCommitStatement());
+ SQLStatementContext<?> selectStatementContext = prepareSelectStatementContext();
+ when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(1))
+ .thenReturn(new MySQLPreparedStatement("select * from tbl where id = ?", prepareSelectStatement(), selectStatementContext));
+ UpdateStatementContext updateStatementContext = mock(UpdateStatementContext.class, RETURNS_DEEP_STUBS);
+ when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(2))
+ .thenReturn(new MySQLPreparedStatement("update tbl set col=1 where id = ?", prepareUpdateStatement(), updateStatementContext));
+ when(connectionSession.getPreparedStatementRegistry().getPreparedStatement(3))
+ .thenReturn(new MySQLPreparedStatement("commit", new MySQLCommitStatement(), new CommonSQLStatementContext<>(new MySQLCommitStatement())));
}
private ShardingSphereDatabase mockDatabase() {
@@ -131,6 +145,12 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
return sqlStatement;
}
+ private SQLStatementContext<?> prepareSelectStatementContext() {
+ SelectStatementContext result = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
+ when(result.getTablesContext().getDatabaseName()).thenReturn(Optional.empty());
+ return result;
+ }
+
private MySQLUpdateStatement prepareUpdateStatement() {
MySQLUpdateStatement result = new MySQLUpdateStatement();
ColumnSegment columnSegment = new ColumnSegment(0, 0, new IdentifierValue("col"));
@@ -145,6 +165,8 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
when(packet.getStatementId()).thenReturn(1);
MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(packet, connectionSession);
when(databaseCommunicationEngine.execute()).thenReturn(new QueryResponseHeader(Collections.singletonList(mock(QueryHeader.class))));
+ when(databaseCommunicationEngine.next()).thenReturn(true, false);
+ when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(new QueryResponseRow(Collections.singletonList(new BinaryQueryResponseCell(Types.INTEGER, 1))));
Iterator<DatabasePacket<?>> actual;
try (MockedStatic<DatabaseCommunicationEngineFactory> mockedStatic = mockStatic(DatabaseCommunicationEngineFactory.class, RETURNS_DEEP_STUBS)) {
mockedStatic.when(() -> DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(any(SQLStatementContext.class), anyString(), anyList(), eq(backendConnection)))
@@ -156,12 +178,19 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
assertThat(actual.next(), instanceOf(MySQLColumnDefinition41Packet.class));
assertThat(actual.next(), instanceOf(MySQLEofPacket.class));
assertFalse(actual.hasNext());
+ assertTrue(mysqlComStmtExecuteExecutor.next());
+ MySQLPacket actualQueryRowPacket = mysqlComStmtExecuteExecutor.getQueryRowPacket();
+ assertThat(actualQueryRowPacket, instanceOf(MySQLBinaryResultSetRowPacket.class));
+ assertThat(actualQueryRowPacket.getSequenceId(), is(4));
+ mysqlComStmtExecuteExecutor.close();
+ verify(databaseCommunicationEngine).close();
}
@Test
public void assertIsUpdateResponse() throws SQLException {
MySQLComStmtExecutePacket packet = mock(MySQLComStmtExecutePacket.class);
when(packet.getStatementId()).thenReturn(2);
+ when(packet.getNewParametersBoundFlag()).thenReturn(MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST);
MySQLComStmtExecuteExecutor mysqlComStmtExecuteExecutor = new MySQLComStmtExecuteExecutor(packet, connectionSession);
when(databaseCommunicationEngine.execute()).thenReturn(new UpdateResponseHeader(new MySQLUpdateStatement()));
Iterator<DatabasePacket<?>> actual;
@@ -191,5 +220,7 @@ public final class MySQLComStmtExecuteExecutorTest extends ProxyContextRestorer
assertThat(mysqlComStmtExecuteExecutor.getResponseType(), is(ResponseType.UPDATE));
assertThat(actual.next(), instanceOf(MySQLOKPacket.class));
assertFalse(actual.hasNext());
+ mysqlComStmtExecuteExecutor.close();
+ verify(textProtocolBackendHandler).close();
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
index 10f4ca924d2..ab5ecebff61 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/test/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/binary/prepare/MySQLComStmtPrepareExecutorTest.java
@@ -19,9 +19,30 @@ package org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.prep
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLCharacterSet;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.MySQLColumnDefinition41Packet;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLStatementIDGenerator;
+import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPrepareOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.prepare.MySQLComStmtPreparePacket;
+import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.UpdateStatementContext;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.parser.config.SQLParserRuleConfiguration;
+import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.backend.session.PreparedStatementRegistry;
import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
+import org.apache.shardingsphere.proxy.frontend.mysql.ProxyContextRestorer;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
+import org.apache.shardingsphere.sql.parser.api.CacheOption;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLSelectStatement;
+import org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLUpdateStatement;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
@@ -29,11 +50,18 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
+import java.util.Iterator;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class MySQLComStmtPrepareExecutorTest {
+public final class MySQLComStmtPrepareExecutorTest extends ProxyContextRestorer {
@Mock
private MySQLComStmtPreparePacket packet;
@@ -41,12 +69,73 @@ public final class MySQLComStmtPrepareExecutorTest {
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ConnectionSession connectionSession;
+ @Before
+ public void setup() {
+ ProxyContext.init(mock(ContextManager.class, RETURNS_DEEP_STUBS));
+ prepareSQLParser();
+ when(connectionSession.getPreparedStatementRegistry()).thenReturn(new PreparedStatementRegistry());
+ when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
+ }
+
+ private void prepareSQLParser() {
+ ContextManager contextManager = ProxyContext.getInstance().getContextManager();
+ MetaDataContexts metaDataContexts = contextManager.getMetaDataContexts();
+ when(metaDataContexts.getMetaData().getGlobalRuleMetaData()).thenReturn(mock(ShardingSphereRuleMetaData.class));
+ CacheOption cacheOption = new CacheOption(1024, 1024);
+ when(metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class))
+ .thenReturn(new SQLParserRule(new SQLParserRuleConfiguration(false, cacheOption, cacheOption)));
+ when(metaDataContexts.getMetaData().getDatabases().get(connectionSession.getDatabaseName()).getProtocolType()).thenReturn(new MySQLDatabaseType());
+ }
+
@Test(expected = UnsupportedPreparedStatementException.class)
public void assertPrepareMultiStatements() throws SQLException {
when(packet.getSql()).thenReturn("update t set v=v+1 where id=1;update t set v=v+1 where id=2;update t set v=v+1 where id=3");
- when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get()).thenReturn(MySQLCharacterSet.UTF8MB4_UNICODE_CI);
when(connectionSession.getAttributeMap().hasAttr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS)).thenReturn(true);
when(connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_OPTION_MULTI_STATEMENTS).get()).thenReturn(0);
new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
}
+
+ @Test
+ public void assertPrepareSelectStatement() throws SQLException {
+ String sql = "select name from t where id = ?";
+ when(packet.getSql()).thenReturn(sql);
+ when(connectionSession.getConnectionId()).thenReturn(1);
+ MySQLStatementIDGenerator.getInstance().registerConnection(1);
+ Iterator<DatabasePacket<?>> actualIterator = new MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+ assertThat(actualIterator.next(), instanceOf(MySQLComStmtPrepareOKPacket.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+ assertFalse(actualIterator.hasNext());
+ MySQLPreparedStatement actualPreparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+ assertThat(actualPreparedStatement.getSql(), is(sql));
+ assertThat(actualPreparedStatement.getSqlStatement(), instanceOf(MySQLSelectStatement.class));
+ assertThat(actualPreparedStatement.getSqlStatementContext(), instanceOf(SelectStatementContext.class));
+ MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+ }
+
+ @Test
+ public void assertPrepareUpdateStatement() throws SQLException {
+ String sql = "update t set v = ?";
+ when(packet.getSql()).thenReturn(sql);
+ when(connectionSession.getConnectionId()).thenReturn(1);
+ MySQLStatementIDGenerator.getInstance().registerConnection(1);
+ Iterator<DatabasePacket<?>> actualIterator = new MySQLComStmtPrepareExecutor(packet, connectionSession).execute().iterator();
+ assertThat(actualIterator.next(), instanceOf(MySQLComStmtPrepareOKPacket.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLColumnDefinition41Packet.class));
+ assertThat(actualIterator.next(), instanceOf(MySQLEofPacket.class));
+ assertFalse(actualIterator.hasNext());
+ MySQLPreparedStatement actualPreparedStatement = connectionSession.getPreparedStatementRegistry().getPreparedStatement(1);
+ assertThat(actualPreparedStatement.getSql(), is(sql));
+ assertThat(actualPreparedStatement.getSqlStatement(), instanceOf(MySQLUpdateStatement.class));
+ assertThat(actualPreparedStatement.getSqlStatementContext(), instanceOf(UpdateStatementContext.class));
+ MySQLStatementIDGenerator.getInstance().unregisterConnection(1);
+ }
+
+ @Test(expected = UnsupportedPreparedStatementException.class)
+ public void assertPrepareNotAllowedStatement() throws SQLException {
+ when(packet.getSql()).thenReturn("begin");
+ new MySQLComStmtPrepareExecutor(packet, connectionSession).execute();
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
index 3052f9847ee..48880226557 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/reactive/mysql/command/query/binary/execute/ReactiveMySQLComStmtExecuteExecutor.java
@@ -27,13 +27,11 @@ import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnTyp
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLConstants;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLNewParametersBoundFlag;
import org.apache.shardingsphere.db.protocol.mysql.packet.MySQLPacket;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatement;
-import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.MySQLPreparedStatementRegistry;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.binary.execute.MySQLComStmtExecutePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
+import org.apache.shardingsphere.infra.binder.aware.ParameterAware;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -54,6 +52,7 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
import org.apache.shardingsphere.proxy.frontend.mysql.command.ServerStatusFlagCalculator;
+import org.apache.shardingsphere.proxy.frontend.mysql.command.query.binary.MySQLPreparedStatement;
import org.apache.shardingsphere.proxy.frontend.mysql.command.query.builder.ResponsePacketBuilder;
import org.apache.shardingsphere.proxy.frontend.reactive.command.executor.ReactiveCommandExecutor;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
@@ -90,16 +89,18 @@ public final class ReactiveMySQLComStmtExecuteExecutor implements ReactiveComman
@Override
public Future<Collection<DatabasePacket<?>>> executeFuture() {
MySQLPreparedStatement preparedStatement = updateAndGetPreparedStatement();
- String databaseName = connectionSession.getDatabaseName();
- MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- SQLStatement sqlStatement = preparedStatement.getSqlStatement();
List<Object> parameters = packet.readParameters(preparedStatement.getParameterTypes());
- SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData().getDatabases(), parameters,
- sqlStatement, connectionSession.getDefaultDatabaseName());
+ SQLStatementContext<?> sqlStatementContext = preparedStatement.getSqlStatementContext();
+ if (sqlStatementContext instanceof ParameterAware) {
+ ((ParameterAware) sqlStatementContext).setUpParameters(parameters);
+ }
// TODO optimize SQLStatementDatabaseHolder
if (sqlStatementContext instanceof TableAvailable) {
((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(SQLStatementDatabaseHolder::set);
}
+ SQLStatement sqlStatement = preparedStatement.getSqlStatement();
+ String databaseName = connectionSession.getDatabaseName();
+ MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
SQLCheckEngine.check(sqlStatement, Collections.emptyList(), getRules(databaseName), databaseName, metaDataContexts.getMetaData().getDatabases(), connectionSession.getGrantee());
int characterSet = connectionSession.getAttributeMap().attr(MySQLConstants.MYSQL_CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
// TODO Refactor the following branch
@@ -130,7 +131,7 @@ public final class ReactiveMySQLComStmtExecuteExecutor implements ReactiveComman
}
private MySQLPreparedStatement updateAndGetPreparedStatement() {
- MySQLPreparedStatement result = MySQLPreparedStatementRegistry.getInstance().getConnectionPreparedStatements(connectionSession.getConnectionId()).get(packet.getStatementId());
+ MySQLPreparedStatement result = connectionSession.getPreparedStatementRegistry().getPreparedStatement(packet.getStatementId());
if (MySQLNewParametersBoundFlag.PARAMETER_TYPE_EXIST == packet.getNewParametersBoundFlag()) {
result.setParameterTypes(packet.getNewParameterTypes());
}