You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/06/09 10:43:31 UTC
[shardingsphere] branch master updated: Refactor PostgreSQL Proxy
(#10736)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new be59cca Refactor PostgreSQL Proxy (#10736)
be59cca is described below
commit be59cca18523e6b2955c665f81a7ce64e834ef08
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed Jun 9 18:43:06 2021 +0800
Refactor PostgreSQL Proxy (#10736)
* Refactor PostgreSQL Proxy
* Fix empty SQL cause NPE
* Fix empty SQL parsing error
* Fix checkstyle
* Fix PostgreSQLComDescribeExecutorTest
* Refactor PostgreSQL Proxy protocol
* Fix no data packet may be missing
* Fix get Optional class mistake
* Fix SQL command not found
* Fix PostgreSQL frontend test cases
* Add PostgreSQLNoDataPacketTest
* Replace ThreadLocal with ConnectionContextRegistry
* Fix checkstyle
* Fix test case
* Polish test case
---
.../query/PostgreSQLNoDataPacket.java} | 28 ++------
.../generic/PostgreSQLCommandCompletePacket.java | 4 --
.../command/query/PostgreSQLNoDataPacketTest.java | 14 ++--
.../frontend/command/CommandExecutorTask.java | 2 +-
.../netty/FrontendChannelInboundHandler.java | 4 +-
.../state/impl/CircuitBreakProxyState.java | 2 +-
.../frontend/command/CommandExecutorTaskTest.java | 2 +-
.../mysql/command/MySQLCommandExecuteEngine.java | 2 +-
.../command/PostgreSQLCommandExecuteEngine.java | 51 +++++++++-----
.../command/PostgreSQLCommandExecutorFactory.java | 24 ++++---
.../command/PostgreSQLConnectionContext.java | 69 +++++++++++++++++++
.../PostgreSQLConnectionContextRegistry.java | 64 ++++++++++++++++++
.../PostgreSQLUnsupportedCommandExecutor.java | 6 ++
.../command/query/PostgreSQLCommand.java | 6 +-
.../binary/bind/PostgreSQLComBindExecutor.java | 75 +++++++++++---------
.../describe/PostgreSQLComDescribeExecutor.java | 13 +++-
.../execute/PostgreSQLComExecuteExecutor.java | 79 ++++++++++++++++++++--
.../binary/parse/PostgreSQLComParseExecutor.java | 35 +++++-----
.../binary/sync/PostgreSQLComSyncExecutor.java | 4 ++
.../query/text/PostgreSQLComQueryExecutor.java | 4 +-
.../PostgreSQLCommandExecuteEngineTest.java | 19 ++++--
.../PostgreSQLCommandExecutorFactoryTest.java | 39 +++++++++--
.../PostgreSQLUnsupportedCommandExecutorTest.java | 6 +-
.../binary/bind/PostgreSQLComBindExecutorTest.java | 51 ++++++++------
.../PostgreSQLComDescribeExecutorTest.java | 32 +++++++--
.../execute/PostgreSQLComExecuteExecutorTest.java | 66 +++++++++++++++++-
.../parse/PostgreSQLComParseExecutorTest.java | 10 ++-
.../binary/sync/PostgreSQLComSyncExecutorTest.java | 10 ++-
.../frontend/command/CommandExecuteEngine.java | 3 +-
29 files changed, 555 insertions(+), 169 deletions(-)
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacket.java
similarity index 63%
copy from shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java
copy to shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacket.java
index 7ea2dd6..356b6a9 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacket.java
@@ -15,44 +15,24 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
+package org.apache.shardingsphere.db.protocol.postgresql.packet.command.query;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
/**
- * Command complete packet for PostgreSQL.
+ * No data packet for PostgreSQL.
*/
-@RequiredArgsConstructor
-public final class PostgreSQLCommandCompletePacket implements PostgreSQLIdentifierPacket {
-
- private final String sqlCommand;
-
- private final long rowCount;
-
- public PostgreSQLCommandCompletePacket() {
- this("", 0);
- }
+public final class PostgreSQLNoDataPacket implements PostgreSQLIdentifierPacket {
@Override
public void write(final PostgreSQLPacketPayload payload) {
- switch (sqlCommand) {
- case "BEGIN":
- case "COMMIT":
- case "ROLLBACK":
- payload.writeStringNul(sqlCommand);
- return;
- default:
- }
- String delimiter = "INSERT".equals(sqlCommand) ? " 0 " : " ";
- payload.writeStringNul(String.join(delimiter, sqlCommand, Long.toString(rowCount)));
}
@Override
public PostgreSQLIdentifierTag getIdentifier() {
- return PostgreSQLMessagePacketType.COMMAND_COMPLETE;
+ return PostgreSQLMessagePacketType.NO_DATA;
}
}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java
index 7ea2dd6..510a2ce 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLCommandCompletePacket.java
@@ -33,10 +33,6 @@ public final class PostgreSQLCommandCompletePacket implements PostgreSQLIdentifi
private final long rowCount;
- public PostgreSQLCommandCompletePacket() {
- this("", 0);
- }
-
@Override
public void write(final PostgreSQLPacketPayload payload) {
switch (sqlCommand) {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacketTest.java
similarity index 64%
copy from shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java
copy to shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacketTest.java
index 453e7c2..31efe35 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/query/PostgreSQLNoDataPacketTest.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe;
+package org.apache.shardingsphere.db.protocol.postgresql.packet.command.query;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.junit.Test;
-import java.util.Collections;
-
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class PostgreSQLComDescribeExecutorTest {
+public final class PostgreSQLNoDataPacketTest {
@Test
- public void assertNewInstance() {
- PostgreSQLComDescribeExecutor actual = new PostgreSQLComDescribeExecutor();
- assertThat(actual.execute(), is(Collections.emptyList()));
+ public void assertIdentifier() {
+ PostgreSQLIdentifierTag actual = new PostgreSQLNoDataPacket().getIdentifier();
+ assertThat(actual, is(PostgreSQLMessagePacketType.NO_DATA));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 4f50eb0..e92d22f 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -105,7 +105,7 @@ public final class CommandExecutorTask implements Runnable {
if (!ExpectedExceptions.isExpected(cause.getClass())) {
log.error("Exception occur: ", cause);
}
- context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(cause));
+ context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(cause, backendConnection));
Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(backendConnection);
databasePacket.ifPresent(context::writeAndFlush);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index 6084b2c..d4d2984 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -22,8 +22,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.authentication.AuthenticationResult;
@@ -79,7 +79,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("Exception occur: ", ex);
- context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
+ context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex, backendConnection));
context.close();
}
return false;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java
index 8fe4aab..62249be 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/CircuitBreakProxyState.java
@@ -33,7 +33,7 @@ public final class CircuitBreakProxyState implements ProxyState {
@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
- context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException()));
+ context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new CircuitBreakException(), backendConnection));
Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(backendConnection);
databasePacket.ifPresent(context::writeAndFlush);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 4707c3b..822d715 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -166,7 +166,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.getConnectionStatus()).thenThrow(mockException);
when(codecEngine.createPacketPayload(message)).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
- when(executeEngine.getErrorPacket(eq(mockException))).thenReturn(databasePacket);
+ when(executeEngine.getErrorPacket(mockException, backendConnection)).thenReturn(databasePacket);
when(executeEngine.getOtherPacket(backendConnection)).thenReturn(Optional.of(databasePacket));
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
index 9495e28..37d8131 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/MySQLCommandExecuteEngine.java
@@ -61,7 +61,7 @@ public final class MySQLCommandExecuteEngine implements CommandExecuteEngine {
}
@Override
- public DatabasePacket<?> getErrorPacket(final Exception cause) {
+ public DatabasePacket<?> getErrorPacket(final Exception cause, final BackendConnection backendConnection) {
return MySQLErrPacketFactory.newInstance(cause);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index cd11412..b73bd9d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQ
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketFactory;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketTypeLoader;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLReadyForQueryPacket;
import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
@@ -36,7 +37,7 @@ import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
-import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind.PostgreSQLComBindExecutor;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.sync.PostgreSQLComSyncExecutor;
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.PostgreSQLComQueryExecutor;
import org.apache.shardingsphere.proxy.frontend.postgresql.err.PostgreSQLErrPacketFactory;
@@ -61,11 +62,13 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
@Override
public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) throws SQLException {
- return PostgreSQLCommandExecutorFactory.newInstance((PostgreSQLCommandPacketType) type, (PostgreSQLCommandPacket) packet, backendConnection);
+ PostgreSQLConnectionContext connectionContext = PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+ return PostgreSQLCommandExecutorFactory.newInstance((PostgreSQLCommandPacketType) type, (PostgreSQLCommandPacket) packet, backendConnection, connectionContext);
}
@Override
- public DatabasePacket<?> getErrorPacket(final Exception cause) {
+ public DatabasePacket<?> getErrorPacket(final Exception cause, final BackendConnection backendConnection) {
+ PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId()).getPendingExecutors().clear();
return PostgreSQLErrPacketFactory.newInstance(cause);
}
@@ -81,35 +84,49 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
return true;
}
if (ResponseType.QUERY == queryCommandExecutor.getResponseType() && !context.channel().isActive()) {
- context.write(new PostgreSQLCommandCompletePacket());
+ context.write(new PostgreSQLCommandCompletePacket(PostgreSQLCommand.SELECT.name(), 0));
return true;
}
- if (ResponseType.UPDATE == queryCommandExecutor.getResponseType() && !(queryCommandExecutor instanceof PostgreSQLComBindExecutor)) {
+ if (queryCommandExecutor instanceof PostgreSQLComQueryExecutor) {
+ return processSimpleQuery(context, backendConnection, (PostgreSQLComQueryExecutor) queryCommandExecutor);
+ }
+ writeDataPackets(context, backendConnection, queryCommandExecutor);
+ return false;
+ }
+
+ private boolean processSimpleQuery(final ChannelHandlerContext context, final BackendConnection backendConnection, final PostgreSQLComQueryExecutor queryExecutor) throws SQLException {
+ if (ResponseType.UPDATE == queryExecutor.getResponseType()) {
context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
return true;
}
- int count = 0;
+ long dataRows = writeDataPackets(context, backendConnection, queryExecutor);
+ if (ResponseType.QUERY == queryExecutor.getResponseType()) {
+ context.write(new PostgreSQLCommandCompletePacket(PostgreSQLCommand.SELECT.name(), dataRows));
+ }
+ context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
+ return true;
+ }
+
+ private long writeDataPackets(final ChannelHandlerContext context, final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor) throws SQLException {
+ long dataRows = 0;
+ int flushCount = 0;
int proxyFrontendFlushThreshold = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
- count++;
+ flushCount++;
while (!context.channel().isWritable() && context.channel().isActive()) {
context.flush();
backendConnection.getResourceLock().doAwait();
}
DatabasePacket<?> resultValue = queryCommandExecutor.getQueryRowPacket();
context.write(resultValue);
- if (proxyFrontendFlushThreshold == count) {
+ if (proxyFrontendFlushThreshold == flushCount) {
context.flush();
- count = 0;
+ flushCount = 0;
+ }
+ if (resultValue instanceof PostgreSQLDataRowPacket) {
+ dataRows++;
}
}
- if (ResponseType.QUERY == queryCommandExecutor.getResponseType()) {
- context.write(new PostgreSQLCommandCompletePacket());
- }
- if (queryCommandExecutor instanceof PostgreSQLComQueryExecutor) {
- context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
- return true;
- }
- return false;
+ return dataRows;
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
index 6860547..a3a3abc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactory.java
@@ -39,6 +39,7 @@ import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.PostgreSQLComQueryExecutor;
import java.sql.SQLException;
+import java.util.Collections;
/**
* Command executor factory for PostgreSQL.
@@ -53,31 +54,36 @@ public final class PostgreSQLCommandExecutorFactory {
* @param commandPacketType command packet type for PostgreSQL
* @param commandPacket command packet for PostgreSQL
* @param backendConnection backend connection
+ * @param connectionContext PostgreSQL connection context
* @return command executor
* @throws SQLException SQL exception
*/
- public static CommandExecutor newInstance(final PostgreSQLCommandPacketType commandPacketType,
- final PostgreSQLCommandPacket commandPacket, final BackendConnection backendConnection) throws SQLException {
+ public static CommandExecutor newInstance(final PostgreSQLCommandPacketType commandPacketType, final PostgreSQLCommandPacket commandPacket,
+ final BackendConnection backendConnection, final PostgreSQLConnectionContext connectionContext) throws SQLException {
log.debug("Execute packet type: {}, value: {}", commandPacketType, commandPacket);
switch (commandPacketType) {
case SIMPLE_QUERY:
return new PostgreSQLComQueryExecutor((PostgreSQLComQueryPacket) commandPacket, backendConnection);
case PARSE_COMMAND:
- return new PostgreSQLComParseExecutor((PostgreSQLComParsePacket) commandPacket, backendConnection);
+ return new PostgreSQLComParseExecutor(connectionContext, (PostgreSQLComParsePacket) commandPacket, backendConnection);
case BIND_COMMAND:
- return new PostgreSQLComBindExecutor((PostgreSQLComBindPacket) commandPacket, backendConnection);
+ connectionContext.getPendingExecutors().add(new PostgreSQLComBindExecutor(connectionContext, (PostgreSQLComBindPacket) commandPacket, backendConnection));
+ break;
case DESCRIBE_COMMAND:
- return new PostgreSQLComDescribeExecutor();
+ connectionContext.getPendingExecutors().add(new PostgreSQLComDescribeExecutor(connectionContext));
+ break;
case EXECUTE_COMMAND:
- return new PostgreSQLComExecuteExecutor();
+ return new PostgreSQLComExecuteExecutor(connectionContext);
case SYNC_COMMAND:
- return new PostgreSQLComSyncExecutor(backendConnection);
+ return new PostgreSQLComSyncExecutor(connectionContext, backendConnection);
case CLOSE_COMMAND:
- return new PostgreSQLComCloseExecutor((PostgreSQLComClosePacket) commandPacket, backendConnection);
+ connectionContext.getPendingExecutors().add(new PostgreSQLComCloseExecutor((PostgreSQLComClosePacket) commandPacket, backendConnection));
+ break;
case TERMINATE:
return new PostgreSQLComTerminationExecutor();
default:
- return new PostgreSQLUnsupportedCommandExecutor();
+ return new PostgreSQLUnsupportedCommandExecutor(connectionContext);
}
+ return Collections::emptyList;
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
new file mode 100644
index 0000000..43a01ed
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.postgresql.command;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Optional;
+
+/**
+ * PostgreSQL connection context.
+ */
+@Getter
+@Setter
+public final class PostgreSQLConnectionContext {
+
+ private final Collection<CommandExecutor> pendingExecutors = new LinkedList<>();
+
+ private SQLStatement sqlStatement;
+
+ private long updateCount;
+
+ /**
+ * Get describe command executor.
+ *
+ * @return describe command executor
+ */
+ public Optional<PostgreSQLComDescribeExecutor> getDescribeExecutor() {
+ return pendingExecutors.stream().filter(PostgreSQLComDescribeExecutor.class::isInstance).map(PostgreSQLComDescribeExecutor.class::cast).findFirst();
+ }
+
+ /**
+ * Get SQL statement.
+ *
+ * @return SQL statement
+ */
+ public Optional<SQLStatement> getSqlStatement() {
+ return Optional.ofNullable(sqlStatement);
+ }
+
+ /**
+ * Clear context.
+ */
+ public void clearContext() {
+ pendingExecutors.clear();
+ sqlStatement = null;
+ updateCount = 0;
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContextRegistry.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContextRegistry.java
new file mode 100644
index 0000000..81f06a2
--- /dev/null
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContextRegistry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.postgresql.command;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * PostgreSQL connection context registry.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PostgreSQLConnectionContextRegistry {
+
+ private static final PostgreSQLConnectionContextRegistry INSTANCE = new PostgreSQLConnectionContextRegistry();
+
+ private final ConcurrentMap<Integer, PostgreSQLConnectionContext> connectionContexts = new ConcurrentHashMap<>(1024);
+
+ /**
+ * Get instance of PostgreSQL connection context registry.
+ *
+ * @return instance of PostgreSQL connection context registry.
+ */
+ public static PostgreSQLConnectionContextRegistry getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Get PostgreSQL connection context.
+ *
+ * @param connectionId backend connection id
+ * @return PostgreSQL connection context
+ */
+ public PostgreSQLConnectionContext get(final int connectionId) {
+ return connectionContexts.computeIfAbsent(connectionId, unused -> new PostgreSQLConnectionContext());
+ }
+
+ /**
+ * Remove PostgreSQL connection context.
+ *
+ * @param connectionId backend connection id
+ * @return Removed PostgreSQL connection context
+ */
+ public PostgreSQLConnectionContext remove(final int connectionId) {
+ return connectionContexts.remove(connectionId);
+ }
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutor.java
index cd90feb..13e2170 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutor.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.generic;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLErrorCode;
import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLMessageSeverityLevel;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import java.util.Collection;
import java.util.Collections;
@@ -29,10 +31,14 @@ import java.util.Collections;
/**
* Unsupported command executor for PostgreSQL.
*/
+@RequiredArgsConstructor
public final class PostgreSQLUnsupportedCommandExecutor implements CommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
@Override
public Collection<DatabasePacket<?>> execute() {
+ connectionContext.clearContext();
// TODO consider what severity and error code to use
PostgreSQLErrorResponsePacket packet = PostgreSQLErrorResponsePacket.newBuilder(PostgreSQLMessageSeverityLevel.ERROR, PostgreSQLErrorCode.FEATURE_NOT_SUPPORTED,
PostgreSQLErrorCode.FEATURE_NOT_SUPPORTED.getConditionName()).build();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
index d5684df..344e0cc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
@@ -20,10 +20,12 @@ package org.apache.shardingsphere.proxy.frontend.postgresql.command.query;
import org.apache.shardingsphere.distsql.parser.statement.rdl.create.impl.AddResourceStatement;
import org.apache.shardingsphere.distsql.parser.statement.rdl.create.impl.CreateShardingTableRuleStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.SetStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.CreateDatabaseStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DropDatabaseStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.BeginTransactionStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
@@ -38,6 +40,7 @@ import java.util.Optional;
*/
public enum PostgreSQLCommand {
+ SELECT(SelectStatement.class),
INSERT(InsertStatement.class),
UPDATE(UpdateStatement.class),
DELETE(DeleteStatement.class),
@@ -45,7 +48,8 @@ public enum PostgreSQLCommand {
DROP(DropDatabaseStatement.class),
BEGIN(BeginTransactionStatement.class),
COMMIT(CommitStatement.class),
- ROLLBACK(RollbackStatement.class);
+ ROLLBACK(RollbackStatement.class),
+ SET(SetStatement.class);
private final Collection<Class<? extends SQLStatement>> sqlStatementClasses;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
index e7039c7..a815f31 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutor.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.binary.BinaryCell;
import org.apache.shardingsphere.db.protocol.binary.BinaryRow;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
@@ -29,7 +30,6 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.bin
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
@@ -46,8 +46,9 @@ import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandlerFactory;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
-import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import java.sql.SQLException;
@@ -60,60 +61,74 @@ import java.util.stream.Collectors;
/**
* Command bind executor for PostgreSQL.
*/
+@RequiredArgsConstructor
public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
private final PostgreSQLComBindPacket packet;
- private final DatabaseCommunicationEngine databaseCommunicationEngine;
+ private final BackendConnection backendConnection;
+
+ private DatabaseCommunicationEngine databaseCommunicationEngine;
- private final TextProtocolBackendHandler textProtocolBackendHandler;
+ private TextProtocolBackendHandler textProtocolBackendHandler;
@Getter
private volatile ResponseType responseType;
- public PostgreSQLComBindExecutor(final PostgreSQLComBindPacket packet, final BackendConnection backendConnection) throws SQLException {
- this.packet = packet;
- if (null == packet.getSql()) {
- databaseCommunicationEngine = null;
- textProtocolBackendHandler = null;
- return;
- }
- ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(DatabaseTypeRegistry.getTrunkDatabaseTypeName(
- ProxyContext.getInstance().getMetaDataContexts().getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType()));
- SQLStatement sqlStatement = sqlStatementParserEngine.parse(packet.getSql(), true);
- if (sqlStatement instanceof TCLStatement) {
- textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"), packet.getSql(), backendConnection);
- databaseCommunicationEngine = null;
- return;
- }
- textProtocolBackendHandler = null;
- databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement, packet.getSql(), packet.getParameters(), backendConnection);
- }
-
@Override
public Collection<DatabasePacket<?>> execute() throws SQLException {
+ init();
List<DatabasePacket<?>> result = new LinkedList<>();
result.add(new PostgreSQLBindCompletePacket());
if (null == databaseCommunicationEngine && null == textProtocolBackendHandler) {
return result;
}
ResponseHeader responseHeader = null != databaseCommunicationEngine ? databaseCommunicationEngine.execute() : textProtocolBackendHandler.execute();
- if (responseHeader instanceof QueryResponseHeader) {
- createQueryPacket((QueryResponseHeader) responseHeader).ifPresent(result::add);
+ if (responseHeader instanceof QueryResponseHeader && connectionContext.getDescribeExecutor().isPresent()) {
+ getRowDescriptionPacket((QueryResponseHeader) responseHeader)
+ .ifPresent(rowDescriptionPacket -> connectionContext.getDescribeExecutor().get().setRowDescriptionPacket(rowDescriptionPacket));
}
if (responseHeader instanceof UpdateResponseHeader) {
responseType = ResponseType.UPDATE;
- result.add(createUpdatePacket((UpdateResponseHeader) responseHeader));
+ connectionContext.setUpdateCount(((UpdateResponseHeader) responseHeader).getUpdateCount());
}
return result;
}
- private Optional<PostgreSQLRowDescriptionPacket> createQueryPacket(final QueryResponseHeader queryResponseHeader) {
- Collection<PostgreSQLColumnDescription> columnDescriptions = createColumnDescriptions(queryResponseHeader);
+ private void init() throws SQLException {
+ SQLStatement sqlStatement = getSqlStatement();
+ if (sqlStatement instanceof TCLStatement || sqlStatement instanceof EmptyStatement) {
+ textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(DatabaseTypeRegistry.getActualDatabaseType("PostgreSQL"), packet.getSql(), backendConnection);
+ return;
+ }
+ databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement, packet.getSql(), packet.getParameters(), backendConnection);
+ }
+
+ private SQLStatement getSqlStatement() {
+ return connectionContext.getSqlStatement().orElseGet(() -> {
+ SQLStatement result = parseSql(packet.getSql(), backendConnection.getSchemaName());
+ connectionContext.setSqlStatement(result);
+ return result;
+ });
+ }
+
+ private SQLStatement parseSql(final String sql, final String schemaName) {
+ if (sql.isEmpty()) {
+ return new EmptyStatement();
+ }
+ ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(
+ DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
+ return sqlStatementParserEngine.parse(sql, true);
+ }
+
+ private Optional<PostgreSQLRowDescriptionPacket> getRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
if (packet.isBinaryRowData()) {
return Optional.empty();
}
responseType = ResponseType.QUERY;
+ Collection<PostgreSQLColumnDescription> columnDescriptions = createColumnDescriptions(queryResponseHeader);
return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions));
}
@@ -126,10 +141,6 @@ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor {
return result;
}
- private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponseHeader updateResponseHeader) {
- return new PostgreSQLCommandCompletePacket(PostgreSQLCommand.valueOf(updateResponseHeader.getSqlStatement().getClass()).map(Enum::name).orElse(""), updateResponseHeader.getUpdateCount());
- }
-
@Override
public boolean next() throws SQLException {
return null != databaseCommunicationEngine && databaseCommunicationEngine.next();
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutor.java
index a1023a9..f224dbd 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutor.java
@@ -17,8 +17,13 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLNoDataPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import java.util.Collection;
import java.util.Collections;
@@ -26,10 +31,16 @@ import java.util.Collections;
/**
* Command describe for PostgreSQL.
*/
+@RequiredArgsConstructor
+@Setter
public final class PostgreSQLComDescribeExecutor implements CommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
+ private PostgreSQLRowDescriptionPacket rowDescriptionPacket;
+
@Override
public Collection<DatabasePacket<?>> execute() {
- return Collections.emptyList();
+ return null == rowDescriptionPacket ? Collections.singletonList(new PostgreSQLNoDataPacket()) : Collections.singletonList(rowDescriptionPacket);
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
index 636786e..053f318 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
@@ -17,19 +17,90 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.execute;
+import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.PostgreSQLCommand;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
+import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Optional;
/**
* Command execute executor for PostgreSQL.
*/
-public final class PostgreSQLComExecuteExecutor implements CommandExecutor {
+@RequiredArgsConstructor
+public final class PostgreSQLComExecuteExecutor implements QueryCommandExecutor {
+
+ private final PostgreSQLConnectionContext connectionContext;
+
+ private final Collection<QueryCommandExecutor> queryCommandExecutors = new LinkedList<>();
+
+ private boolean commandComplete;
+
+ @Override
+ public Collection<DatabasePacket<?>> execute() throws SQLException {
+ Collection<DatabasePacket<?>> result = new LinkedList<>();
+ for (CommandExecutor each : connectionContext.getPendingExecutors()) {
+ if (each instanceof QueryCommandExecutor) {
+ queryCommandExecutors.add((QueryCommandExecutor) each);
+ }
+ result.addAll(each.execute());
+ }
+ connectionContext.getPendingExecutors().clear();
+ return result;
+ }
+
+ @Override
+ public ResponseType getResponseType() {
+ return ResponseType.QUERY;
+ }
+
+ @Override
+ public boolean next() throws SQLException {
+ return !commandComplete;
+ }
@Override
- public Collection<DatabasePacket<?>> execute() {
- return Collections.emptyList();
+ public DatabasePacket<?> getQueryRowPacket() throws SQLException {
+ Optional<DatabasePacket<?>> result = getPacketFromQueryCommandExecutors();
+ if (result.isPresent()) {
+ return result.get();
+ }
+ return createCommandCompletePacket();
+ }
+
+ private PostgreSQLIdentifierPacket createCommandCompletePacket() {
+ commandComplete = true;
+ if (connectionContext.getSqlStatement().map(EmptyStatement.class::isInstance).orElse(false)) {
+ return new PostgreSQLEmptyQueryResponsePacket();
+ }
+ String sqlCommand = connectionContext.getSqlStatement().map(SQLStatement::getClass).map(PostgreSQLCommand::valueOf).map(command -> command.map(Enum::name).orElse("")).orElse("");
+ PostgreSQLCommandCompletePacket result = new PostgreSQLCommandCompletePacket(sqlCommand, connectionContext.getUpdateCount());
+ connectionContext.clearContext();
+ return result;
+ }
+
+ private Optional<DatabasePacket<?>> getPacketFromQueryCommandExecutors() throws SQLException {
+ Iterator<QueryCommandExecutor> iterator = queryCommandExecutors.iterator();
+ while (iterator.hasNext()) {
+ QueryCommandExecutor next = iterator.next();
+ if (next.next()) {
+ return Optional.of(next.getQueryRowPacket());
+ } else {
+ iterator.remove();
+ }
+ }
+ return Optional.empty();
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutor.java
index fb82905..a5859ba 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutor.java
@@ -18,16 +18,18 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.parse;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.ConnectionScopeBinaryStatementRegistry;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.parse.PostgreSQLParseCompletePacket;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
-import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
import java.util.Collection;
import java.util.Collections;
@@ -37,26 +39,25 @@ import java.util.Collections;
*/
public final class PostgreSQLComParseExecutor implements CommandExecutor {
- private final PostgreSQLComParsePacket packet;
-
- private final String schemaName;
-
- private final ConnectionScopeBinaryStatementRegistry binaryStatementRegistry;
+ public PostgreSQLComParseExecutor(final PostgreSQLConnectionContext connectionContext, final PostgreSQLComParsePacket packet, final BackendConnection backendConnection) {
+ String schemaName = backendConnection.getSchemaName();
+ ConnectionScopeBinaryStatementRegistry binaryStatementRegistry = PostgreSQLBinaryStatementRegistry.getInstance().get(backendConnection.getConnectionId());
+ SQLStatement sqlStatement = parseSql(packet.getSql(), schemaName);
+ connectionContext.setSqlStatement(sqlStatement);
+ binaryStatementRegistry.register(packet.getStatementId(), packet.getSql(), sqlStatement.getParameterCount(), packet.getBinaryStatementColumnTypes());
+ }
- public PostgreSQLComParseExecutor(final PostgreSQLComParsePacket packet, final BackendConnection backendConnection) {
- this.packet = packet;
- schemaName = backendConnection.getSchemaName();
- binaryStatementRegistry = PostgreSQLBinaryStatementRegistry.getInstance().get(backendConnection.getConnectionId());
+ private SQLStatement parseSql(final String sql, final String schemaName) {
+ if (sql.isEmpty()) {
+ return new EmptyStatement();
+ }
+ ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(
+ DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
+ return sqlStatementParserEngine.parse(sql, true);
}
@Override
public Collection<DatabasePacket<?>> execute() {
- if (!packet.getSql().isEmpty()) {
- ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine(
- DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getMetaData(schemaName).getResource().getDatabaseType()));
- SQLStatement sqlStatement = sqlStatementParserEngine.parse(packet.getSql(), true);
- binaryStatementRegistry.register(packet.getStatementId(), packet.getSql(), sqlStatement.getParameterCount(), packet.getBinaryStatementColumnTypes());
- }
return Collections.singletonList(new PostgreSQLParseCompletePacket());
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutor.java
index de249fa..70cf416 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutor.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import java.sql.SQLException;
import java.util.Collection;
@@ -34,10 +35,13 @@ import java.util.Collections;
@RequiredArgsConstructor
public final class PostgreSQLComSyncExecutor implements QueryCommandExecutor {
+ private final PostgreSQLConnectionContext connectionContext;
+
private final BackendConnection backendConnection;
@Override
public Collection<DatabasePacket<?>> execute() {
+ connectionContext.clearContext();
return Collections.singletonList(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
index 17d416f..51d3725 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/text/PostgreSQLComQueryExecutor.java
@@ -62,13 +62,13 @@ public final class PostgreSQLComQueryExecutor implements QueryCommandExecutor {
public Collection<DatabasePacket<?>> execute() throws SQLException {
ResponseHeader responseHeader = textProtocolBackendHandler.execute();
if (responseHeader instanceof QueryResponseHeader) {
- return Collections.singletonList(createQueryPacket((QueryResponseHeader) responseHeader));
+ return Collections.singletonList(createRowDescriptionPacket((QueryResponseHeader) responseHeader));
}
responseType = ResponseType.UPDATE;
return Collections.singletonList(createUpdatePacket((UpdateResponseHeader) responseHeader));
}
- private PostgreSQLRowDescriptionPacket createQueryPacket(final QueryResponseHeader queryResponseHeader) {
+ private PostgreSQLRowDescriptionPacket createRowDescriptionPacket(final QueryResponseHeader queryResponseHeader) {
Collection<PostgreSQLColumnDescription> columnDescriptions = createColumnDescriptions(queryResponseHeader);
responseType = ResponseType.QUERY;
return new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions);
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
index 3d26671..ec753b1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngineTest.java
@@ -45,7 +45,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -53,6 +52,9 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLCommandExecuteEngineTest {
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
private ChannelHandlerContext channelHandlerContext;
@Mock
@@ -71,18 +73,27 @@ public final class PostgreSQLCommandExecuteEngineTest {
}
@Test
+ public void assertSimpleQueryWithUpdateResponseWriteQueryData() throws SQLException {
+ PostgreSQLComQueryExecutor comQueryExecutor = mock(PostgreSQLComQueryExecutor.class);
+ when(comQueryExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
+ PostgreSQLCommandExecuteEngine commandExecuteEngine = new PostgreSQLCommandExecuteEngine();
+ boolean actual = commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection, comQueryExecutor, 0);
+ assertTrue(actual);
+ verify(channelHandlerContext).write(any(PostgreSQLReadyForQueryPacket.class));
+ }
+
+ @Test
public void assertWriteQueryDataWithUpdate() throws SQLException {
PostgreSQLCommandExecuteEngine commandExecuteEngine = new PostgreSQLCommandExecuteEngine();
when(queryCommandExecutor.getResponseType()).thenReturn(ResponseType.UPDATE);
boolean actual = commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection, queryCommandExecutor, 0);
- assertTrue(actual);
- verify(channelHandlerContext, times(1)).write(isA(PostgreSQLReadyForQueryPacket.class));
+ assertFalse(actual);
}
@Test
public void assertWriteQueryDataWithComSync() throws SQLException {
PostgreSQLCommandExecuteEngine commandExecuteEngine = new PostgreSQLCommandExecuteEngine();
- boolean actual = commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection, new PostgreSQLComSyncExecutor(backendConnection), 0);
+ boolean actual = commandExecuteEngine.writeQueryData(channelHandlerContext, backendConnection, new PostgreSQLComSyncExecutor(connectionContext, backendConnection), 0);
assertTrue(actual);
verify(channelHandlerContext, never()).write(any(Object.class));
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactoryTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactoryTest.java
index 0bcf87c..eb24935 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactoryTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecutorFactoryTest.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatementRegistry;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.close.PostgreSQLComClosePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.parse.PostgreSQLComParsePacket;
@@ -35,7 +36,11 @@ import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.parse.PostgreSQLComParseExecutor;
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.sync.PostgreSQLComSyncExecutor;
import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.PostgreSQLComQueryExecutor;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
import java.util.Arrays;
@@ -43,23 +48,43 @@ import java.util.Collection;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLCommandExecutorFactoryTest {
+ @Mock
+ private BackendConnection backendConnection;
+
+ @BeforeClass
+ public static void setup() {
+ PostgreSQLBinaryStatementRegistry.getInstance().register(0);
+ }
+
+ @Test
+ public void assertPendingCommandExecutors() throws SQLException {
+ PostgreSQLConnectionContext connectionContext = mock(PostgreSQLConnectionContext.class);
+ Collection<CommandExecutor> pendingCommandExecutors = mock(Collection.class);
+ when(connectionContext.getPendingExecutors()).thenReturn(pendingCommandExecutors);
+ PostgreSQLCommandExecutorFactory.newInstance(PostgreSQLCommandPacketType.CLOSE_COMMAND, mock(PostgreSQLComClosePacket.class), backendConnection, connectionContext);
+ PostgreSQLCommandExecutorFactory.newInstance(PostgreSQLCommandPacketType.BIND_COMMAND, mock(PostgreSQLComBindPacket.class), backendConnection, connectionContext);
+ PostgreSQLCommandExecutorFactory.newInstance(PostgreSQLCommandPacketType.DESCRIBE_COMMAND, null, backendConnection, connectionContext);
+ verify(pendingCommandExecutors).add(any(PostgreSQLComCloseExecutor.class));
+ verify(pendingCommandExecutors).add(any(PostgreSQLComBindExecutor.class));
+ verify(pendingCommandExecutors).add(any(PostgreSQLComDescribeExecutor.class));
+ }
+
@Test
public void assertNewInstance() throws SQLException {
- BackendConnection backendConnection = mock(BackendConnection.class);
when(backendConnection.getSchemaName()).thenReturn("schema");
Collection<InputOutput> inputOutputs = Arrays.asList(
new InputOutput(PostgreSQLCommandPacketType.SIMPLE_QUERY, PostgreSQLComQueryPacket.class, PostgreSQLComQueryExecutor.class),
new InputOutput(PostgreSQLCommandPacketType.PARSE_COMMAND, PostgreSQLComParsePacket.class, PostgreSQLComParseExecutor.class),
- new InputOutput(PostgreSQLCommandPacketType.BIND_COMMAND, PostgreSQLComBindPacket.class, PostgreSQLComBindExecutor.class),
- new InputOutput(PostgreSQLCommandPacketType.DESCRIBE_COMMAND, null, PostgreSQLComDescribeExecutor.class),
new InputOutput(PostgreSQLCommandPacketType.EXECUTE_COMMAND, null, PostgreSQLComExecuteExecutor.class),
new InputOutput(PostgreSQLCommandPacketType.SYNC_COMMAND, null, PostgreSQLComSyncExecutor.class),
- new InputOutput(PostgreSQLCommandPacketType.CLOSE_COMMAND, PostgreSQLComClosePacket.class, PostgreSQLComCloseExecutor.class),
new InputOutput(PostgreSQLCommandPacketType.TERMINATE, null, PostgreSQLComTerminationExecutor.class)
);
for (InputOutput inputOutput : inputOutputs) {
@@ -68,7 +93,7 @@ public final class PostgreSQLCommandExecutorFactoryTest {
commandPacketClass = PostgreSQLCommandPacket.class;
}
PostgreSQLCommandPacket packet = preparePacket(commandPacketClass);
- CommandExecutor actual = PostgreSQLCommandExecutorFactory.newInstance(inputOutput.getCommandPacketType(), packet, mock(BackendConnection.class));
+ CommandExecutor actual = PostgreSQLCommandExecutorFactory.newInstance(inputOutput.getCommandPacketType(), packet, backendConnection, mock(PostgreSQLConnectionContext.class));
assertThat(actual, instanceOf(inputOutput.getResultClass()));
}
}
@@ -78,6 +103,10 @@ public final class PostgreSQLCommandExecutorFactoryTest {
if (result instanceof PostgreSQLComQueryPacket) {
when(((PostgreSQLComQueryPacket) result).getSql()).thenReturn("");
}
+ if (result instanceof PostgreSQLComParsePacket) {
+ when(((PostgreSQLComParsePacket) result).getStatementId()).thenReturn("S_0");
+ when(((PostgreSQLComParsePacket) result).getSql()).thenReturn("");
+ }
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutorTest.java
index 94b30e4..6dc7672 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/generic/PostgreSQLUnsupportedCommandExecutorTest.java
@@ -17,11 +17,13 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.generic;
-import java.util.Collection;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.junit.Test;
+import java.util.Collection;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -30,7 +32,7 @@ public final class PostgreSQLUnsupportedCommandExecutorTest {
@Test
public void assertExecute() {
- PostgreSQLUnsupportedCommandExecutor commandExecutor = new PostgreSQLUnsupportedCommandExecutor();
+ PostgreSQLUnsupportedCommandExecutor commandExecutor = new PostgreSQLUnsupportedCommandExecutor(new PostgreSQLConnectionContext());
Collection<DatabasePacket<?>> actual = commandExecutor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), instanceOf(PostgreSQLErrorResponsePacket.class));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
index fa7ac01..bce3b95 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/bind/PostgreSQLComBindExecutorTest.java
@@ -20,12 +20,10 @@ package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary
import lombok.SneakyThrows;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBinaryResultSetRowPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
@@ -33,7 +31,10 @@ import org.apache.shardingsphere.proxy.backend.response.header.query.QueryRespon
import org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe.PostgreSQLComDescribeExecutor;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -45,11 +46,11 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Optional;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -60,6 +61,12 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLComBindExecutorTest {
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
+ private PostgreSQLComDescribeExecutor describeExecutor;
+
+ @Mock
private PostgreSQLComBindPacket bindPacket;
@Mock
@@ -68,50 +75,54 @@ public final class PostgreSQLComBindExecutorTest {
@Mock
private DatabaseCommunicationEngine databaseCommunicationEngine;
+ @Before
+ public void setup() {
+ when(bindPacket.getSql()).thenReturn("");
+ }
+
@Test
public void assertExecuteEmptyBindPacket() throws SQLException {
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
Collection<DatabasePacket<?>> actual = executor.execute();
assertThat(actual.size(), is(1));
assertThat(actual.iterator().next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertNull(executor.getResponseType());
+ assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
assertFalse(executor.next());
}
@Test
public void assertExecuteBindPacketWithQuerySQLAndReturnEmptyResult() throws SQLException {
+ when(connectionContext.getDescribeExecutor()).thenReturn(Optional.of(describeExecutor));
QueryResponseHeader queryResponseHeader = mock(QueryResponseHeader.class);
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
- assertThat(actual.size(), is(2));
- Iterator<DatabasePacket<?>> actualPackets = actual.iterator();
- assertThat(actualPackets.next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(actualPackets.next(), is(instanceOf(PostgreSQLRowDescriptionPacket.class)));
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
assertThat(executor.getResponseType(), is(ResponseType.QUERY));
verify(queryResponseHeader).getQueryHeaders();
}
@Test
public void assertExecuteBindPacketWithQuerySQL() throws SQLException {
+ when(connectionContext.getDescribeExecutor()).thenReturn(Optional.of(describeExecutor));
QueryResponseHeader queryResponseHeader = mock(QueryResponseHeader.class);
when(queryResponseHeader.getQueryHeaders()).thenReturn(Collections.singletonList(new QueryHeader("schema", "table", "label", "column", 1, "type", 2, 3, true, true, true, true)));
when(databaseCommunicationEngine.execute()).thenReturn(queryResponseHeader);
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
- assertThat(actual.size(), is(2));
+ assertThat(actual.size(), is(1));
Iterator<DatabasePacket<?>> actualPackets = actual.iterator();
assertThat(actualPackets.next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(actualPackets.next(), is(instanceOf(PostgreSQLRowDescriptionPacket.class)));
assertThat(executor.getResponseType(), is(ResponseType.QUERY));
}
@Test
public void assertNext() throws SQLException {
when(databaseCommunicationEngine.next()).thenReturn(true, false);
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
assertTrue(executor.next());
assertFalse(executor.next());
@@ -121,7 +132,7 @@ public final class PostgreSQLComBindExecutorTest {
public void assertDataRowNotBinary() throws SQLException {
QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
verify(queryResponseRow).getData();
@@ -133,7 +144,7 @@ public final class PostgreSQLComBindExecutorTest {
when(bindPacket.isBinaryRowData()).thenReturn(true);
QueryResponseRow queryResponseRow = mock(QueryResponseRow.class);
when(databaseCommunicationEngine.getQueryResponseRow()).thenReturn(queryResponseRow);
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
PostgreSQLPacket actualQueryRowPacket = executor.getQueryRowPacket();
verify(queryResponseRow).getCells();
@@ -143,13 +154,11 @@ public final class PostgreSQLComBindExecutorTest {
@Test
public void assertExecuteBindPacketWithUpdateSQL() throws SQLException {
when(databaseCommunicationEngine.execute()).thenReturn(new UpdateResponseHeader(mock(InsertStatement.class)));
- PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(bindPacket, backendConnection);
+ PostgreSQLComBindExecutor executor = new PostgreSQLComBindExecutor(connectionContext, bindPacket, backendConnection);
setMockFieldIntoExecutor(executor);
Collection<DatabasePacket<?>> actual = executor.execute();
- assertThat(actual.size(), is(2));
- Iterator<DatabasePacket<?>> actualPackets = actual.iterator();
- assertThat(actualPackets.next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
- assertThat(actualPackets.next(), is(instanceOf(PostgreSQLCommandCompletePacket.class)));
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(instanceOf(PostgreSQLBindCompletePacket.class)));
assertThat(executor.getResponseType(), is(ResponseType.UPDATE));
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java
index 453e7c2..f020b96 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/describe/PostgreSQLComDescribeExecutorTest.java
@@ -17,18 +17,42 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.describe;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLNoDataPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Collections;
+import java.util.Collection;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLComDescribeExecutorTest {
+ @Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Test
+ public void assertDescribeEmptyStatement() {
+ Collection<DatabasePacket<?>> actual = new PostgreSQLComDescribeExecutor(connectionContext).execute();
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(instanceOf(PostgreSQLNoDataPacket.class)));
+ }
+
@Test
- public void assertNewInstance() {
- PostgreSQLComDescribeExecutor actual = new PostgreSQLComDescribeExecutor();
- assertThat(actual.execute(), is(Collections.emptyList()));
+ public void assertDescribeRows() {
+ PostgreSQLComDescribeExecutor describeExecutor = new PostgreSQLComDescribeExecutor(connectionContext);
+ PostgreSQLRowDescriptionPacket expected = mock(PostgreSQLRowDescriptionPacket.class);
+ describeExecutor.setRowDescriptionPacket(expected);
+ Collection<DatabasePacket<?>> actual = describeExecutor.execute();
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), is(expected));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
index b060fcf..dd46dfb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutorTest.java
@@ -17,18 +17,78 @@
package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.execute;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLEmptyQueryResponsePacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket;
+import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor;
+import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.EmptyStatement;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Optional;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public final class PostgreSQLComExecuteExecutorTest {
+ @Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
+ private QueryCommandExecutor queryCommandExecutor;
+
+ @Mock
+ private PostgreSQLPacket postgreSQLPacket;
+
+ @Mock
+ private PostgreSQLDataRowPacket dataRowPacket;
+
+ @Test
+ public void assertExecuteQuery() throws SQLException {
+ when(connectionContext.getPendingExecutors()).thenReturn(new ArrayList<>(Collections.singletonList(queryCommandExecutor)));
+ when(queryCommandExecutor.execute()).thenReturn(Collections.singletonList(postgreSQLPacket));
+ when(queryCommandExecutor.next()).thenReturn(true, false);
+ when((PostgreSQLDataRowPacket) queryCommandExecutor.getQueryRowPacket()).thenReturn(dataRowPacket);
+ PostgreSQLComExecuteExecutor actual = new PostgreSQLComExecuteExecutor(connectionContext);
+ Collection<DatabasePacket<?>> actualPackets = actual.execute();
+ assertThat(actualPackets.size(), is(1));
+ assertThat(actualPackets.iterator().next(), is(postgreSQLPacket));
+ assertTrue(actual.next());
+ assertThat(actual.getQueryRowPacket(), is(dataRowPacket));
+ assertTrue(actual.next());
+ assertThat(actual.getQueryRowPacket(), is(instanceOf(PostgreSQLCommandCompletePacket.class)));
+ assertFalse(actual.next());
+ }
+
+ @Test
+ public void assertExecuteUpdate() throws SQLException {
+ when(connectionContext.getSqlStatement()).thenReturn(Optional.of(mock(EmptyStatement.class)));
+ PostgreSQLComExecuteExecutor actual = new PostgreSQLComExecuteExecutor(connectionContext);
+ assertTrue(actual.next());
+ assertThat(actual.getQueryRowPacket(), is(instanceOf(PostgreSQLEmptyQueryResponsePacket.class)));
+ assertFalse(actual.next());
+ }
+
@Test
- public void assertNewInstance() {
- PostgreSQLComExecuteExecutor actual = new PostgreSQLComExecuteExecutor();
- assertThat(actual.execute(), is(Collections.emptyList()));
+ public void assertResponseType() {
+ ResponseType actual = new PostgreSQLComExecuteExecutor(connectionContext).getResponseType();
+ assertThat(actual, is(ResponseType.QUERY));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutorTest.java
index 6f2ad0d..233a807 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/parse/PostgreSQLComParseExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -48,6 +49,9 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLComParseExecutorTest {
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
private PostgreSQLComParsePacket parsePacket;
@Mock
@@ -64,7 +68,7 @@ public final class PostgreSQLComParseExecutorTest {
metaDataContexts.set(ProxyContext.getInstance(), new StandardMetaDataContexts(getMetaDataMap(),
mock(ShardingSphereRuleMetaData.class), mock(ExecutorEngine.class), new ConfigurationProperties(new Properties())));
PostgreSQLBinaryStatementRegistry.getInstance().register(1);
- PostgreSQLComParseExecutor actual = new PostgreSQLComParseExecutor(parsePacket, backendConnection);
+ PostgreSQLComParseExecutor actual = new PostgreSQLComParseExecutor(connectionContext, parsePacket, backendConnection);
assertThat(actual.execute().iterator().next(), instanceOf(PostgreSQLParseCompletePacket.class));
}
@@ -76,9 +80,11 @@ public final class PostgreSQLComParseExecutorTest {
@Test
public void assertGetSqlWithNull() {
+ when(parsePacket.getStatementId()).thenReturn("");
when(parsePacket.getSql()).thenReturn("");
when(backendConnection.getConnectionId()).thenReturn(1);
- PostgreSQLComParseExecutor actual = new PostgreSQLComParseExecutor(parsePacket, backendConnection);
+ PostgreSQLBinaryStatementRegistry.getInstance().register(1);
+ PostgreSQLComParseExecutor actual = new PostgreSQLComParseExecutor(connectionContext, parsePacket, backendConnection);
assertThat(actual.execute().iterator().next(), instanceOf(PostgreSQLParseCompletePacket.class));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutorTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutorTest.java
index 15cc35b..e20a121 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutorTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/sync/PostgreSQLComSyncExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType;
+import org.apache.shardingsphere.proxy.frontend.postgresql.command.PostgreSQLConnectionContext;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,26 +41,29 @@ import static org.mockito.Mockito.when;
public final class PostgreSQLComSyncExecutorTest {
@Mock
+ private PostgreSQLConnectionContext connectionContext;
+
+ @Mock
private BackendConnection backendConnection;
@Test
public void assertNewInstance() {
when(backendConnection.getTransactionStatus()).thenReturn(new TransactionStatus(TransactionType.LOCAL));
- PostgreSQLComSyncExecutor actual = new PostgreSQLComSyncExecutor(backendConnection);
+ PostgreSQLComSyncExecutor actual = new PostgreSQLComSyncExecutor(connectionContext, backendConnection);
assertThat(actual.execute().iterator().next(), is(instanceOf(PostgreSQLReadyForQueryPacket.class)));
}
@Test(expected = UnsupportedOperationException.class)
@SneakyThrows(SQLException.class)
public void assertNextFalse() {
- PostgreSQLComSyncExecutor actual = new PostgreSQLComSyncExecutor(backendConnection);
+ PostgreSQLComSyncExecutor actual = new PostgreSQLComSyncExecutor(connectionContext, backendConnection);
assertFalse(actual.next());
actual.getQueryRowPacket();
}
@Test
public void assertResponseType() {
- ResponseType actual = new PostgreSQLComSyncExecutor(backendConnection).getResponseType();
+ ResponseType actual = new PostgreSQLComSyncExecutor(connectionContext, backendConnection).getResponseType();
assertThat(actual, is(ResponseType.UPDATE));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
index 8503bb8..041ff3d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-spi/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecuteEngine.java
@@ -68,9 +68,10 @@ public interface CommandExecuteEngine {
* Get error packet.
*
* @param cause cause of error
+ * @param backendConnection backend connection
* @return error packet
*/
- DatabasePacket<?> getErrorPacket(Exception cause);
+ DatabasePacket<?> getErrorPacket(Exception cause, BackendConnection backendConnection);
/**
* Get other packet.