You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/04/24 06:40:29 UTC

[shardingsphere] branch master updated: Support Transaction in PostgreSQL (#10169)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a4891  Support Transaction in PostgreSQL (#10169)
84a4891 is described below

commit 84a48917c4c08df3644c74fe0ba9f6e54d9e7eb6
Author: Liang Zhang <te...@163.com>
AuthorDate: Sat Apr 24 14:39:57 2021 +0800

    Support Transaction in PostgreSQL (#10169)
---
 .../packet/generic/PostgreSQLReadyForQueryPacket.java     | 13 +++++++++++--
 .../packet/generic/PostgreSQLReadyForQueryPacketTest.java | 15 +++++++++++++--
 .../distsql/parser/core/DistSQLVisitor.java               |  3 ++-
 .../parser/api/DistSQLStatementParserEngineTest.java      |  2 +-
 .../rdl/impl/CreateShardingTableRuleBackendHandler.java   |  9 ++++++---
 .../impl/CreateShardingTableRuleBackendHandlerTest.java   |  9 +++++----
 .../authentication/PostgreSQLAuthenticationEngine.java    |  2 +-
 .../command/PostgreSQLCommandExecuteEngine.java           |  9 +++++----
 .../statement/ddl/impl/AlterSessionStatementAssert.java   |  2 +-
 9 files changed, 45 insertions(+), 19 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
index f6c980c..f017877 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/main/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacket.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.db.protocol.postgresql.packet.generic;
 
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLIdentifierTag;
 import org.apache.shardingsphere.db.protocol.postgresql.packet.identifier.PostgreSQLMessagePacketType;
@@ -25,13 +26,21 @@ import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacket
 /**
  * Ready for query packet for PostgreSQL.
  */
+@RequiredArgsConstructor
 public final class PostgreSQLReadyForQueryPacket implements PostgreSQLIdentifierPacket {
     
-    private static final char STATUS = 'I';
+    private static final char IN_TRANSACTION = 'T';
+    
+    private static final char NOT_IN_TRANSACTION = 'I';
+    
+    // TODO consider about TRANSACTION_FAILED
+    private static final char TRANSACTION_FAILED = 'E';
+    
+    private final boolean isInTransaction;
     
     @Override
     public void write(final PostgreSQLPacketPayload payload) {
-        payload.writeInt1(STATUS);
+        payload.writeInt1(isInTransaction ? IN_TRANSACTION : NOT_IN_TRANSACTION);
     }
     
     @Override
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
index db4845a..7c0ecb3 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-postgresql/src/test/java/org/apache/shardingsphere/db/protocol/postgresql/packet/generic/PostgreSQLReadyForQueryPacketTest.java
@@ -29,10 +29,21 @@ import static org.junit.Assert.assertThat;
 public final class PostgreSQLReadyForQueryPacketTest {
     
     @Test
-    public void assertReadWrite() {
+    public void assertReadWriteWithInTransaction() {
         ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
         PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf);
-        PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket();
+        PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(true);
+        assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
+        packet.write(payload);
+        assertThat(byteBuf.writerIndex(), is(1));
+        assertThat(byteBuf.readByte(), is((byte) 'T'));
+    }
+    
+    @Test
+    public void assertReadWriteWithNotInTransaction() {
+        ByteBuf byteBuf = ByteBufTestUtils.createByteBuf(1);
+        PostgreSQLPacketPayload payload = new PostgreSQLPacketPayload(byteBuf);
+        PostgreSQLReadyForQueryPacket packet = new PostgreSQLReadyForQueryPacket(false);
         assertThat(packet.getIdentifier(), is(PostgreSQLMessagePacketType.READY_FOR_QUERY));
         packet.write(payload);
         assertThat(byteBuf.writerIndex(), is(1));
diff --git a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
index ac64ce1..7325a95 100644
--- a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
+++ b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.distsql.parser.core;
 
 import com.google.common.base.Joiner;
+import org.antlr.v4.runtime.tree.ParseTree;
 import org.antlr.v4.runtime.tree.TerminalNode;
 import org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementBaseVisitor;
 import org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.AddResourceContext;
@@ -188,7 +189,7 @@ public final class DistSQLVisitor extends DistSQLStatementBaseVisitor<ASTNode> {
     @Override
     public ASTNode visitCreateShardingBroadcastTableRules(final CreateShardingBroadcastTableRulesContext ctx) {
         CreateShardingBroadcastTableRulesStatement result = new CreateShardingBroadcastTableRulesStatement();
-        result.getTables().addAll(ctx.IDENTIFIER().stream().map(each -> each.getText()).collect(Collectors.toList()));
+        result.getTables().addAll(ctx.IDENTIFIER().stream().map(ParseTree::getText).collect(Collectors.toList()));
         return result;
     }
     
diff --git a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java
index e2f1389..57e3960 100644
--- a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java
+++ b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/test/java/org/apache/shardingsphere/distsql/parser/api/DistSQLStatementParserEngineTest.java
@@ -92,7 +92,7 @@ public final class DistSQLStatementParserEngineTest {
         SQLStatement sqlStatement = engine.parse(RDL_ADD_RESOURCE_MULTIPLE);
         assertTrue(sqlStatement instanceof AddResourceStatement);
         assertThat(((AddResourceStatement) sqlStatement).getDataSources().size(), is(2));
-        List<DataSourceSegment> dataSourceSegments = new ArrayList(((AddResourceStatement) sqlStatement).getDataSources());
+        List<DataSourceSegment> dataSourceSegments = new ArrayList<>(((AddResourceStatement) sqlStatement).getDataSources());
         DataSourceSegment dataSourceSegment = dataSourceSegments.get(0);
         assertThat(dataSourceSegment.getName(), is("ds_0"));
         assertThat(dataSourceSegment.getHostName(), is("127.0.0.1"));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java
index ce442da..240d62c 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandler.java
@@ -30,12 +30,15 @@ import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.text.SchemaRequiredBackendHandler;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
 import org.apache.shardingsphere.sharding.converter.ShardingRuleStatementConverter;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -83,12 +86,12 @@ public final class CreateShardingTableRuleBackendHandler extends SchemaRequiredB
     
     private Collection<String> getLogicTables(final String schemaName) {
         Optional<ShardingRuleConfiguration> shardingRuleConfiguration = getShardingRuleConfiguration(schemaName);
-        Collection<String> result = new ArrayList<>();
+        Collection<String> result = new LinkedList<>();
         if (!shardingRuleConfiguration.isPresent()) {
             return result;
         }
-        result.addAll(shardingRuleConfiguration.get().getTables().stream().map(each -> each.getLogicTable()).collect(Collectors.toList()));
-        result.addAll(shardingRuleConfiguration.get().getAutoTables().stream().map(each -> each.getLogicTable()).collect(Collectors.toList()));
+        result.addAll(shardingRuleConfiguration.get().getTables().stream().map(ShardingTableRuleConfiguration::getLogicTable).collect(Collectors.toList()));
+        result.addAll(shardingRuleConfiguration.get().getAutoTables().stream().map(ShardingAutoTableRuleConfiguration::getLogicTable).collect(Collectors.toList()));
         return result;
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java
index 6dc5e6a..163b0c4 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateShardingTableRuleBackendHandlerTest.java
@@ -41,6 +41,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -71,9 +72,9 @@ public final class CreateShardingTableRuleBackendHandlerTest {
     private CreateShardingTableRuleBackendHandler handler = new CreateShardingTableRuleBackendHandler(sqlStatement, backendConnection);
     
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         ProxyContext.getInstance().init(metaDataContexts, transactionContexts);
-        when(metaDataContexts.getAllSchemaNames()).thenReturn(Arrays.asList("test"));
+        when(metaDataContexts.getAllSchemaNames()).thenReturn(Collections.singletonList("test"));
         when(metaDataContexts.getMetaData(eq("test"))).thenReturn(shardingSphereMetaData);
         when(shardingSphereMetaData.getRuleMetaData()).thenReturn(ruleMetaData);
     }
@@ -99,7 +100,7 @@ public final class CreateShardingTableRuleBackendHandlerTest {
         TableRuleSegment tableRuleSegment = new TableRuleSegment();
         tableRuleSegment.setLogicTable("t_order");
         when(ruleMetaData.getConfigurations()).thenReturn(buildShardingConfigurations());
-        when(sqlStatement.getTables()).thenReturn(Arrays.asList(tableRuleSegment));
+        when(sqlStatement.getTables()).thenReturn(Collections.singletonList(tableRuleSegment));
         handler.execute("test", sqlStatement);
     }
     
@@ -107,6 +108,6 @@ public final class CreateShardingTableRuleBackendHandlerTest {
         ShardingRuleConfiguration configuration = new ShardingRuleConfiguration();
         configuration.getTables().add(new ShardingTableRuleConfiguration("t_order"));
         configuration.getAutoTables().add(new ShardingAutoTableRuleConfiguration("t_order"));
-        return new ArrayList<>(Arrays.asList(configuration));
+        return new ArrayList<>(Collections.singletonList(configuration));
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
index bcb9912..8df85a5 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/authentication/PostgreSQLAuthenticationEngine.java
@@ -100,7 +100,7 @@ public final class PostgreSQLAuthenticationEngine implements AuthenticationEngin
         context.write(new PostgreSQLParameterStatusPacket("server_version", "12.3"));
         context.write(new PostgreSQLParameterStatusPacket("client_encoding", "UTF8"));
         context.write(new PostgreSQLParameterStatusPacket("server_encoding", "UTF8"));
-        context.writeAndFlush(new PostgreSQLReadyForQueryPacket());
+        context.writeAndFlush(new PostgreSQLReadyForQueryPacket(false));
         return AuthenticationResultBuilder.finished(currentAuthResult.getUsername(), "", currentAuthResult.getDatabase());
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
index a6e5971..8fa1feb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/PostgreSQLCommandExecuteEngine.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ
 import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLReadyForQueryPacket;
 import org.apache.shardingsphere.db.protocol.postgresql.payload.PostgreSQLPacketPayload;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.transaction.TransactionHolder;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.frontend.command.CommandExecuteEngine;
@@ -68,7 +69,7 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
     
     @Override
     public Optional<DatabasePacket<?>> getOtherPacket() {
-        return Optional.of(new PostgreSQLReadyForQueryPacket());
+        return Optional.of(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction()));
     }
     
     @Override
@@ -76,11 +77,11 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
                                final BackendConnection backendConnection, final QueryCommandExecutor queryCommandExecutor, final int headerPackagesCount) throws SQLException {
         if (ResponseType.QUERY == queryCommandExecutor.getResponseType() && !context.channel().isActive()) {
             context.write(new PostgreSQLCommandCompletePacket());
-            context.write(new PostgreSQLReadyForQueryPacket());
+            context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction()));
             return;
         }
         if (ResponseType.UPDATE == queryCommandExecutor.getResponseType()) {
-            context.write(new PostgreSQLReadyForQueryPacket());
+            context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction()));
             return;
         }
         int count = 0;
@@ -99,6 +100,6 @@ public final class PostgreSQLCommandExecuteEngine implements CommandExecuteEngin
             }
         }
         context.write(new PostgreSQLCommandCompletePacket());
-        context.write(new PostgreSQLReadyForQueryPacket());
+        context.write(new PostgreSQLReadyForQueryPacket(TransactionHolder.isTransaction()));
     }
 }
diff --git a/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java b/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java
index 237a770..7ea7426 100644
--- a/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java
+++ b/shardingsphere-sql-parser/shardingsphere-sql-parser-test/src/main/java/org/apache/shardingsphere/test/sql/parser/parameterized/asserts/statement/ddl/impl/AlterSessionStatementAssert.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.test.sql.parser.parameterized.jaxb.cases.domain
  * Alter session statement assert.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class AlterSessionStatementAssert {
+public final class AlterSessionStatementAssert {
     
     /**
      * Assert alter session statement is correct with expected parser result.