You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/13 11:00:51 UTC

[shardingsphere] branch master updated: Fix PostgreSQL Proxy duplicate ReadyForQuery (#12987)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 61b19b5  Fix PostgreSQL Proxy duplicate ReadyForQuery (#12987)
61b19b5 is described below

commit 61b19b57b7101b1c03eb7b345a225a9db49d32a9
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Wed Oct 13 19:00:02 2021 +0800

    Fix PostgreSQL Proxy duplicate ReadyForQuery (#12987)
---
 .../command/PostgreSQLCommandPacketType.java       | 18 ++++++++++++++++++
 .../command/PostgreSQLCommandPacketTypeTest.java   | 22 +++++++++++++---------
 .../PostgreSQLMessagePacketTypeTest.java}          |  5 ++---
 .../command/PostgreSQLCommandExecuteEngine.java    | 16 ++++++++++++++--
 .../command/PostgreSQLConnectionContext.java       | 12 ++++++++++--
 .../execute/PostgreSQLComExecuteExecutor.java      |  2 +-
 6 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
index 014ad2d..def5e13 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketType.java
@@ -22,6 +22,10 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * Command packet type for PostgreSQL.
  * 
@@ -49,6 +53,10 @@ public enum PostgreSQLCommandPacketType implements CommandPacketType, PostgreSQL
     
     TERMINATE('X');
     
+    private static final Set<PostgreSQLCommandPacketType> EXTENDED_PROTOCOL_PACKET_TYPE = new HashSet<>(Arrays.asList(PostgreSQLCommandPacketType.PARSE_COMMAND,
+            PostgreSQLCommandPacketType.BIND_COMMAND, PostgreSQLCommandPacketType.DESCRIBE_COMMAND, PostgreSQLCommandPacketType.EXECUTE_COMMAND, PostgreSQLCommandPacketType.SYNC_COMMAND,
+            PostgreSQLCommandPacketType.CLOSE_COMMAND, PostgreSQLCommandPacketType.FLUSH_COMMAND));
+    
     private final char value;
     
     /**
@@ -65,4 +73,14 @@ public enum PostgreSQLCommandPacketType implements CommandPacketType, PostgreSQL
         }
         throw new IllegalArgumentException(String.format("Cannot find '%s' in PostgreSQL command packet type", value));
     }
+    
+    /**
+     * Check if the packet type is extended protocol packet type.
+     *
+     * @param commandPacketType command packet type
+     * @return is extended protocol packet type
+     */
+    public static boolean isExtendedProtocolPacketType(final PostgreSQLCommandPacketType commandPacketType) {
+        return EXTENDED_PROTOCOL_PACKET_TYPE.contains(commandPacketType);
+    }
 }
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
index 8fed55b..cb89948 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
@@ -17,21 +17,25 @@
 
 package org.apache.shardingsphere.db.protocol.postgresql.packet.command;
 
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
 import org.junit.Test;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class PostgreSQLCommandPacketTypeTest {
     
-    @Test
-    public void assertValueOf() {
-        assertThat(PostgreSQLMessagePacketType.valueOf(PostgreSQLMessagePacketType.AUTHENTICATION_REQUEST.getValue()), is(PostgreSQLMessagePacketType.AUTHENTICATION_REQUEST));
+    @Test(expected = IllegalArgumentException.class)
+    public void assertValueOfUnknownCommandPacketType() {
+        PostgreSQLCommandPacketType.valueOf(-1);
     }
     
-    @Test(expected = IllegalArgumentException.class)
-    public void assertGetValueWithIllegalArgument() {
-        PostgreSQLMessagePacketType.valueOf(-1);
+    @Test
+    public void assertValueOfExtendedProtocolCommandPacketType() {
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.PARSE_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.BIND_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.DESCRIBE_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.EXECUTE_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.CLOSE_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.FLUSH_COMMAND));
+        assertTrue(PostgreSQLCommandPacketType.isExtendedProtocolPacketType(PostgreSQLCommandPacketType.SYNC_COMMAND));
     }
 }
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
similarity index 89%
copy from shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
copy to shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
index 8fed55b..b412c46 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/command/PostgreSQLCommandPacketTypeTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/identifier/PostgreSQLMessagePacketTypeTest.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.db.protocol.postgresql.packet.command;
+package org.apache.shardingsphere.db.protocol.postgresql.packet.identifier;
 
-import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-public final class PostgreSQLCommandPacketTypeTest {
+public final class PostgreSQLMessagePacketTypeTest {
     
     @Test
     public void assertValueOf() {
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index 7a10125..14a9cf1 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -43,6 +43,7 @@ import org.apache.shardingsphere.proxy.frontend.postgresql.command.query.text.Po
 import org.apache.shardingsphere.proxy.frontend.postgresql.err.PostgreSQLErrPacketFactory;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.Optional;
 
 /**
@@ -57,24 +58,35 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
     
     @Override
     public PostgreSQLCommandPacket getCommandPacket(final PacketPayload payload, final CommandPacketType type, final BackendConnection backendConnection) {
+        PostgreSQLConnectionContext connectionContext = PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+        connectionContext.setCurrentPacketType((PostgreSQLCommandPacketType) type);
         return PostgreSQLCommandPacketFactory.newInstance((PostgreSQLCommandPacketType) type, (PostgreSQLPacketPayload) payload, backendConnection.getConnectionId());
     }
     
     @Override
     public CommandExecutor getCommandExecutor(final CommandPacketType type, final CommandPacket packet, final BackendConnection backendConnection) throws SQLException {
         PostgreSQLConnectionContext connectionContext = PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+        if (connectionContext.isErrorOccurred() && PostgreSQLCommandPacketType.isExtendedProtocolPacketType((PostgreSQLCommandPacketType) type) && PostgreSQLCommandPacketType.SYNC_COMMAND != type) {
+            return Collections::emptyList;
+        }
         return PostgreSQLCommandExecutorFactory.newInstance((PostgreSQLCommandPacketType) type, (PostgreSQLCommandPacket) packet, backendConnection, connectionContext);
     }
     
     @Override
     public DatabasePacket<?> getErrorPacket(final Exception cause, final BackendConnection backendConnection) {
-        PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId()).getPendingExecutors().clear();
+        PostgreSQLConnectionContext connectionContext = PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+        if (PostgreSQLCommandPacketType.isExtendedProtocolPacketType(connectionContext.getCurrentPacketType())) {
+            connectionContext.setErrorOccurred(true);
+            connectionContext.getPendingExecutors().clear();
+        }
         return PostgreSQLErrPacketFactory.newInstance(cause);
     }
     
     @Override
     public Optional<DatabasePacket<?>> getOtherPacket(final BackendConnection backendConnection) {
-        return Optional.of(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
+        PostgreSQLConnectionContext connectionContext = PostgreSQLConnectionContextRegistry.getInstance().get(backendConnection.getConnectionId());
+        return PostgreSQLCommandPacketType.isExtendedProtocolPacketType(connectionContext.getCurrentPacketType()) ? Optional.empty()
+                : Optional.of(new PostgreSQLReadyForQueryPacket(backendConnection.getTransactionStatus().isInTransaction()));
     }
     
     @Override
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
index 2a1c836..1ca380d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLConnectionContext.java
@@ -17,9 +17,11 @@
 
 package org.apache.shardingsphere.proxy.frontend.postgresql.command;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLValueFormat;
+import org.apache.shardingsphere.db.protocol.postgresql.packet.command.PostgreSQLCommandPacketType;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.PostgreSQLBinaryStatement;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.frontend.command.executor.CommandExecutor;
@@ -37,17 +39,21 @@ import java.util.Optional;
 /**
  * PostgreSQL connection context.
  */
+@Getter
 @Setter
 public final class PostgreSQLConnectionContext {
     
+    @Getter(AccessLevel.NONE)
     private final Map<String, PostgreSQLPortal> portals = new LinkedHashMap<>();
     
-    @Getter
     private final Collection<CommandExecutor> pendingExecutors = new LinkedList<>();
     
-    @Getter
     private long updateCount;
     
+    private PostgreSQLCommandPacketType currentPacketType;
+    
+    private boolean errorOccurred;
+    
     /**
      * Create a portal.
      *
@@ -124,5 +130,7 @@ public final class PostgreSQLConnectionContext {
     public void clearContext() {
         pendingExecutors.clear();
         updateCount = 0;
+        currentPacketType = null;
+        errorOccurred = false;
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
index 05749e9..d22dfc6 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/binary/execute/PostgreSQLComExecuteExecutor.java
@@ -89,7 +89,7 @@ public final class PostgreSQLComExecuteExecutor implements CommandExecutor {
         }
         String sqlCommand = PostgreSQLCommand.valueOf(portal.getSqlStatement().getClass()).map(PostgreSQLCommand::getTag).orElse("");
         PostgreSQLCommandCompletePacket result = new PostgreSQLCommandCompletePacket(sqlCommand, Math.max(dataRows, connectionContext.getUpdateCount()));
-        connectionContext.clearContext();
+        connectionContext.setUpdateCount(0);
         return result;
     }