You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/30 05:57:27 UTC
[shardingsphere] branch master updated: Consider transaction failed in PostgreSQL (#15208)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 65abf4f Consider transaction failed in PostgreSQL (#15208)
65abf4f is described below
commit 65abf4fbc20ed9641734acaaf7284e55942ad26d
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sun Jan 30 13:56:36 2022 +0800
Consider transaction failed in PostgreSQL (#15208)
---
.../generic/PostgreSQLReadyForQueryPacket.java | 27 +++++++++-----------
.../generic/PostgreSQLReadyForQueryPacketTest.java | 29 ++++++++++++++--------
.../OpenGaussAuthenticationEngine.java | 2 +-
.../PostgreSQLAuthenticationEngine.java | 2 +-
.../command/PostgreSQLCommandExecuteEngine.java | 8 +++---
.../extended/sync/PostgreSQLComSyncExecutor.java | 2 +-
6 files changed, 38 insertions(+), 32 deletions(-)
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
index f017877..ce444df 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
@@ -17,34 +17,31 @@
package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
+import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
/**
* Ready for query packet for PostgreSQL.
*/
-@RequiredArgsConstructor
-public final class PostgreSQLReadyForQueryPacket implements PostgreSQLIdentifierPacket {
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PostgreSQLReadyForQueryPacket implements PostgreSQLPacket {
- private static final char IN_TRANSACTION = 'T';
+ public static final PostgreSQLReadyForQueryPacket NOT_IN_TRANSACTION = new PostgreSQLReadyForQueryPacket((byte) 'I');
- private static final char NOT_IN_TRANSACTION = 'I';
+ public static final PostgreSQLReadyForQueryPacket IN_TRANSACTION = new PostgreSQLReadyForQueryPacket((byte) 'T');
- // TODO consider about TRANSACTION_FAILED
- private static final char TRANSACTION_FAILED = 'E';
+ public static final PostgreSQLReadyForQueryPacket TRANSACTION_FAILED = new PostgreSQLReadyForQueryPacket((byte) 'E');
- private final boolean isInTransaction;
+ private static final byte[] PREFIX = new byte[]{(byte) PostgreSQLMessagePacketType.READY_FOR_QUERY.getValue(), 0, 0, 0, 5};
- @Override
- public void write(final PostgreSQLPacketPayload payload) {
- payload.writeInt1(isInTransaction ? IN_TRANSACTION : NOT_IN_TRANSACTION);
- }
+ private final byte status;
@Override
- public PostgreSQLIdentifierTag getIdentifier() {
- return PostgreSQLMessagePacketType.READY_FOR_QUERY;
+ public void write(final PostgreSQLPacketPayload payload) {
+ payload.getByteBuf().writeBytes(PREFIX);
+ payload.getByteBuf().writeByte(status);
}
}
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
index ec26efa..a76d6bf 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
import io.netty.buffer.ByteBuf;
import org.apache.shardingsphere.db.protocol.postgresql.packet.ByteBufTestUtils;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
import org.junit.Test;
@@ -32,23 +31,31 @@ public final class PostgreSQLReadyForQueryPacketTest {
@Test
public void assertReadWriteWithInTransaction() {
- ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
+ ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
- PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(true);
- assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
+ PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.IN_TRANSACTION;
packet.write(payload);
- assertThat(byteBuf.writerIndex(), is(1));
- assertThat(byteBuf.readByte(), is((byte) 'T'));
+ assertThat(byteBuf.writerIndex(), is(6));
+ assertThat(byteBuf.getByte(5), is((byte) 'T'));
}
@Test
public void assertReadWriteWithNotInTransaction() {
- ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
+ ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
- PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(false);
- assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
+ PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION;
packet.write(payload);
- assertThat(byteBuf.writerIndex(), is(1));
- assertThat(byteBuf.readByte(), is((byte) 'I'));
+ assertThat(byteBuf.writerIndex(), is(6));
+ assertThat(byteBuf.getByte(5), is((byte) 'I'));
+ }
+
+ @Test
+ public void assertReadWriteWithTransactionFailed() {
+ ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
+ PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
+ PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.TRANSACTION_FAILED;
+ packet.write(payload);
+ assertThat(byteBuf.writerIndex(), is(6));
+ assertThat(byteBuf.getByte(5), is((byte) 'E'));
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
index a71d4f0..078fcaf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
@@ -128,7 +128,7 @@ public final class OpenGaussAuthenticationEngine implements AuthenticationEngine
context.write(new PostgreSQLParameterStatusPacket("client_encoding", clientEncoding));
context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8"));
context.write(new PostgreSQLParameterStatusPacket("integer_datetimes", "on"));
- context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false));
+ context.writeAndFlush(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase());
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
index 9a3fd83..8eff4b7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
@@ -109,7 +109,7 @@ public final class PostgreSQLAuthenticationEngine implements AuthenticationEngin
context.write(new PostgreSQLParameterStatusPacket("client_encoding", clientEncoding));
context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8"));
context.write(new PostgreSQLParameterStatusPacket("integer_datetimes", "on"));
- context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false));
+ context.writeAndFlush(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase());
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index 0d091ca..8035206 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -74,7 +74,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
@Override
public Optional<DatabasePacket<?>> getOtherPacket(final ConnectionSession connectionSession) {
- return Optional.of(new PostgreSQLReadyForQueryPacket(connectionSession.getTransactionStatus().isInTransaction()));
+ return Optional.of(connectionSession.getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.TRANSACTION_FAILED : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
}
@Override
@@ -93,14 +93,16 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
private boolean processSimpleQuery(final ChannelHandlerContext context, final JDBCBackendConnection backendConnection, final PostgreSQLComQueryExecutor queryExecutor) throws SQLException {
if (ResponseType.UPDATE == queryExecutor.getResponseType()) {
- context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction()));
+ context.write(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
+ : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
return true;
}
long dataRows = writeDataPackets(context, backendConnection, queryExecutor);
if (ResponseType.QUERY == queryExecutor.getResponseType()) {
context.write(new PostgreSQLCommandCompletePacket(PostgreSQLCommand.SELECT.name(), dataRows));
}
- context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction()));
+ context.write(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
+ : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
return true;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
index 4f8171d..d1ff9a7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
@@ -36,6 +36,6 @@ public final class PostgreSQLComSyncExecutor implements CommandExecutor {
@Override
public Collection<DatabasePacket<?>> execute() {
- return Collections.singleton(new PostgreSQLReadyForQueryPacket(connectionSession.getTransactionStatus().isInTransaction()));
+ return Collections.singletonList(connectionSession.getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
}
}