You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/30 05:57:27 UTC

[shardingsphere] branch master updated: Consider transaction failed in PostgreSQL (#15208)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 65abf4f  Consider transaction failed in PostgreSQL (#15208)
65abf4f is described below

commit 65abf4fbc20ed9641734acaaf7284e55942ad26d
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Sun Jan 30 13:56:36 2022 +0800

    Consider transaction failed in PostgreSQL (#15208)
---
 .../generic/PostgreSQLReadyForQueryPacket.java     | 27 +++++++++-----------
 .../generic/PostgreSQLReadyForQueryPacketTest.java | 29 ++++++++++++++--------
 .../OpenGaussAuthenticationEngine.java             |  2 +-
 .../PostgreSQLAuthenticationEngine.java            |  2 +-
 .../command/PostgreSQLCommandExecuteEngine.java    |  8 +++---
 .../extended/sync/PostgreSQLComSyncExecutor.java   |  2 +-
 6 files changed, 38 insertions(+), 32 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
index f017877..ce444df 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
@@ -17,34 +17,31 @@
 
 package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
 
+import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
 import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
 
 /**
  * Ready for query packet for PostgreSQL.
  */
-@RequiredArgsConstructor
-public final class PostgreSQLReadyForQueryPacket implements PostgreSQLIdentifierPacket {
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PostgreSQLReadyForQueryPacket implements PostgreSQLPacket {
     
-    private static final char IN_TRANSACTION = 'T';
+    public static final PostgreSQLReadyForQueryPacket NOT_IN_TRANSACTION = new PostgreSQLReadyForQueryPacket((byte) 'I');
     
-    private static final char NOT_IN_TRANSACTION = 'I';
+    public static final PostgreSQLReadyForQueryPacket IN_TRANSACTION = new PostgreSQLReadyForQueryPacket((byte) 'T');
     
-    // TODO consider about TRANSACTION_FAILED
-    private static final char TRANSACTION_FAILED = 'E';
+    public static final PostgreSQLReadyForQueryPacket TRANSACTION_FAILED = new PostgreSQLReadyForQueryPacket((byte) 'E');
     
-    private final boolean isInTransaction;
+    private static final byte[] PREFIX = new byte[]{(byte) PostgreSQLMessagePacketType.READY_FOR_QUERY.getValue(), 0, 0, 0, 5};
     
-    @Override
-    public void write(final PostgreSQLPacketPayload payload) {
-        payload.writeInt1(isInTransaction ? IN_TRANSACTION : NOT_IN_TRANSACTION);
-    }
+    private final byte status;
     
     @Override
-    public PostgreSQLIdentifierTag getIdentifier() {
-        return PostgreSQLMessagePacketType.READY_FOR_QUERY;
+    public void write(final PostgreSQLPacketPayload payload) {
+        payload.getByteBuf().writeBytes(PREFIX);
+        payload.getByteBuf().writeByte(status);
     }
 }
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
index ec26efa..a76d6bf 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.ByteBufTestUtils;
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
 import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
 import org.junit.Test;
 
@@ -32,23 +31,31 @@ public final class PostgreSQLReadyForQueryPacketTest {
     
     @Test
     public void assertReadWriteWithInTransaction() {
-        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
+        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
         PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
-        PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(true);
-        assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
+        PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.IN_TRANSACTION;
         packet.write(payload);
-        assertThat(byteBuf.writerIndex(), is(1));
-        assertThat(byteBuf.readByte(), is((byte) 'T'));
+        assertThat(byteBuf.writerIndex(), is(6));
+        assertThat(byteBuf.getByte(5), is((byte) 'T'));
     }
     
     @Test
     public void assertReadWriteWithNotInTransaction() {
-        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
+        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
         PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
-        PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(false);
-        assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
+        PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION;
         packet.write(payload);
-        assertThat(byteBuf.writerIndex(), is(1));
-        assertThat(byteBuf.readByte(), is((byte) 'I'));
+        assertThat(byteBuf.writerIndex(), is(6));
+        assertThat(byteBuf.getByte(5), is((byte) 'I'));
+    }
+    
+    @Test
+    public void assertReadWriteWithTransactionFailed() {
+        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(6);
+        PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf, StandardCharsets.UTF_8);
+        PostgreSQLReadyForQueryPacket packet = PostgreSQLReadyForQueryPacket.TRANSACTION_FAILED;
+        packet.write(payload);
+        assertThat(byteBuf.writerIndex(), is(6));
+        assertThat(byteBuf.getByte(5), is((byte) 'E'));
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
index a71d4f0..078fcaf 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-opengauss/src/main/java/org/apache/shardingsphere/proxy/frontend/opengauss/authentication/OpenGaussAuthenticationEngine.java
@@ -128,7 +128,7 @@ public final class OpenGaussAuthenticationEngine implements AuthenticationEngine
         context.write(new PostgreSQLParameterStatusPacket("client_encoding", clientEncoding));
         context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8"));
         context.write(new PostgreSQLParameterStatusPacket("integer_datetimes", "on"));
-        context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false));
+        context.writeAndFlush(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
         return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase());
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
index 9a3fd83..8eff4b7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
@@ -109,7 +109,7 @@ public final class PostgreSQLAuthenticationEngine implements AuthenticationEngin
         context.write(new PostgreSQLParameterStatusPacket("client_encoding", clientEncoding));
         context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8"));
         context.write(new PostgreSQLParameterStatusPacket("integer_datetimes", "on"));
-        context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false));
+        context.writeAndFlush(PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
         return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase());
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index 0d091ca..8035206 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -74,7 +74,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
     
     @Override
     public Optional<DatabasePacket<?>> getOtherPacket(final ConnectionSession connectionSession) {
-        return Optional.of(new PostgreSQLReadyForQueryPacket(connectionSession.getTransactionStatus().isInTransaction()));
+        return Optional.of(connectionSession.getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.TRANSACTION_FAILED : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
     }
     
     @Override
@@ -93,14 +93,16 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
     
     private boolean processSimpleQuery(final ChannelHandlerContext context, final JDBCBackendConnection backendConnection, final PostgreSQLComQueryExecutor queryExecutor) throws SQLException {
         if (ResponseType.UPDATE == queryExecutor.getResponseType()) {
-            context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction()));
+            context.write(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
+                    : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
             return true;
         }
         long dataRows = writeDataPackets(context, backendConnection, queryExecutor);
         if (ResponseType.QUERY == queryExecutor.getResponseType()) {
             context.write(new PostgreSQLCommandCompletePacket(PostgreSQLCommand.SELECT.name(), dataRows));
         }
-        context.write(new PostgreSQLReadyForQueryPacket(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction()));
+        context.write(backendConnection.getConnectionSession().getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION
+                : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
         return true;
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
index 4f8171d..d1ff9a7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/sync/PostgreSQLComSyncExecutor.java
@@ -36,6 +36,6 @@ public final class PostgreSQLComSyncExecutor implements CommandExecutor {
     
     @Override
     public Collection<DatabasePacket<?>> execute() {
-        return Collections.singleton(new PostgreSQLReadyForQueryPacket(connectionSession.getTransactionStatus().isInTransaction()));
+        return Collections.singletonList(connectionSession.getTransactionStatus().isInTransaction() ? PostgreSQLReadyForQueryPacket.IN_TRANSACTION : PostgreSQLReadyForQueryPacket.NOT_IN_TRANSACTION);
     }
 }