You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/12 03:37:36 UTC
[shardingsphere] branch master updated: Fix serial flag judgment error within the transaction(#12052) (#22073)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 59ab85bf749 Fix serial flag judgment error within the transaction(#12052) (#22073)
59ab85bf749 is described below
commit 59ab85bf7499d5c06bd990a50c89f38dd3f1c6f3
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Sat Nov 12 11:37:27 2022 +0800
Fix serial flag judgment error within the transaction(#12052) (#22073)
* Fix
Fix isSerial judgment error within the transaction(#12052)
* Fix
* Fix
---
.../executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java | 6 ++++--
.../infra/executor/sql/execute/engine/raw/RawExecutor.java | 5 +++--
.../infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java | 7 ++++---
.../org/apache/shardingsphere/driver/executor/DriverExecutor.java | 4 ++--
.../jdbc/core/statement/ShardingSpherePreparedStatement.java | 2 +-
.../driver/executor/batch/BatchPreparedStatementExecutorTest.java | 4 ++--
.../proxy/backend/communication/ProxySQLExecutor.java | 7 ++++---
.../communication/jdbc/JDBCDatabaseCommunicationEngine.java | 2 +-
.../proxy/backend/handler/distsql/rul/sql/PreviewHandler.java | 3 ++-
.../command/query/text/query/MySQLMultiStatementsHandler.java | 5 +++--
.../query/extended/bind/OpenGaussComBatchBindExecutorTest.java | 2 ++
.../query/extended/PostgreSQLBatchedStatementsExecutor.java | 3 ++-
.../PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java | 2 ++
.../query/extended/PostgreSQLBatchedStatementsExecutorTest.java | 2 ++
14 files changed, 34 insertions(+), 20 deletions(-)
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
index 923ab33d7bd..a0d9e876657 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutor.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
@@ -34,7 +35,8 @@ public final class JDBCExecutor {
private final ExecutorEngine executorEngine;
- private final boolean serial;
+ // TODO add transaction type to ConnectionContext
+ private final ConnectionContext connectionContext;
/**
* Execute.
@@ -62,7 +64,7 @@ public final class JDBCExecutor {
public <T> List<T> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final JDBCExecutorCallback<T> firstCallback, final JDBCExecutorCallback<T> callback) throws SQLException {
try {
- return executorEngine.execute(executionGroupContext, firstCallback, callback, serial);
+ return executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionConnectionContext().isInTransaction());
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index 7ae30fcbbc2..df48b64d605 100644
--- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.QueryContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -42,7 +43,7 @@ public final class RawExecutor {
private final ExecutorEngine executorEngine;
- private final boolean serial;
+ private final ConnectionContext connectionContext;
private final ConfigurationProperties props;
@@ -75,7 +76,7 @@ public final class RawExecutor {
private <T> List<T> execute(final ExecutionGroupContext<RawSQLExecutionUnit> executionGroupContext,
final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
try {
- return (List<T>) executorEngine.execute(executionGroupContext, firstCallback, callback, serial);
+ return (List<T>) executorEngine.execute(executionGroupContext, firstCallback, callback, connectionContext.getTransactionConnectionContext().isInTransaction());
} catch (final SQLException ex) {
SQLExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
diff --git a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
index b336e17f9d5..d5d428a2eb2 100644
--- a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
+++ b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.jdbc;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -45,7 +46,7 @@ public final class JDBCExecutorTest {
ExecutionGroup<JDBCExecutionUnit> group = new ExecutionGroup<>(Collections.singletonList(mock(JDBCExecutionUnit.class)));
ExecutionGroupContext context = new ExecutionGroupContext(Collections.singletonList(group));
when(executorEngine.execute(any(), any(), any(), anyBoolean())).thenReturn(Collections.singletonList("test"));
- JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
+ JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, new ConnectionContext());
List<?> actual1 = jdbcExecutor.execute(context, null);
assertThat(actual1, is(Collections.singletonList("test")));
List<?> actual2 = jdbcExecutor.execute(context, null, null);
@@ -57,7 +58,7 @@ public final class JDBCExecutorTest {
try {
ExecutorEngine executorEngine = mock(ExecutorEngine.class);
when(executorEngine.execute(new ExecutionGroupContext<>(anyCollection()), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
- JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
+ JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, new ConnectionContext());
jdbcExecutor.execute(new ExecutionGroupContext<>(Collections.emptyList()), null);
} catch (final SQLException ex) {
assertThat(ex.getMessage(), is("TestSQLException"));
@@ -68,7 +69,7 @@ public final class JDBCExecutorTest {
public void assertExecuteNotThrownSQLException() throws SQLException {
ExecutorEngine executorEngine = mock(ExecutorEngine.class);
when(executorEngine.execute(new ExecutionGroupContext<>(anyCollection()), any(), any(), anyBoolean())).thenThrow(new SQLException("TestSQLException"));
- JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, false);
+ JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, new ConnectionContext());
SQLExecutorExceptionHandler.setExceptionThrown(false);
List<?> actual = jdbcExecutor.execute(new ExecutionGroupContext<>(Collections.emptyList()), null);
assertThat(actual, is(Collections.emptyList()));
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 177bd3b881e..261e01a07ab 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -50,9 +50,9 @@ public final class DriverExecutor implements AutoCloseable {
MetaDataContexts metaDataContexts = connection.getContextManager().getMetaDataContexts();
ExecutorEngine executorEngine = connection.getContextManager().getExecutorEngine();
EventBusContext eventBusContext = connection.getContextManager().getInstanceContext().getEventBusContext();
- JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.isHoldTransaction());
+ JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, connection.getConnectionContext());
regularExecutor = new DriverJDBCExecutor(connection.getDatabaseName(), connection.getContextManager(), jdbcExecutor);
- rawExecutor = new RawExecutor(executorEngine, connection.isHoldTransaction(), metaDataContexts.getMetaData().getProps(), eventBusContext);
+ rawExecutor = new RawExecutor(executorEngine, connection.getConnectionContext(), metaDataContexts.getMetaData().getProps(), eventBusContext);
DatabaseType protocolType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
String schemaName = DatabaseTypeEngine.getDefaultSchemaName(protocolType, connection.getDatabaseName());
SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index f4a52ca6bb3..fc53e4a587e 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -198,7 +198,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
executor = new DriverExecutor(connection);
- JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.isHoldTransaction());
+ JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getConnectionContext());
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName(), eventBusContext);
kernelProcessor = new KernelProcessor();
statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData());
diff --git a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 2bc9c936738..4e4911b8fdf 100644
--- a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++ b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -63,8 +63,8 @@ public final class BatchPreparedStatementExecutorTest extends AbstractBaseExecut
@Override
public void setUp() throws SQLException {
super.setUp();
- actual = spy(new BatchPreparedStatementExecutor(getConnection().getContextManager().getMetaDataContexts(), new JDBCExecutor(getExecutorEngine(), false), DefaultDatabase.LOGIC_NAME,
- new EventBusContext()));
+ actual = spy(new BatchPreparedStatementExecutor(getConnection().getContextManager().getMetaDataContexts(), new JDBCExecutor(getExecutorEngine(), getConnection().getConnectionContext()),
+ DefaultDatabase.LOGIC_NAME, new EventBusContext()));
when(sqlStatementContext.getTablesContext()).thenReturn(mock(TablesContext.class));
}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 8754a675ab9..84698306bac 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.backend.communication;
import org.apache.shardingsphere.dialect.exception.transaction.TableModifyInTransactionException;
import org.apache.shardingsphere.infra.binder.type.TableAvailable;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
@@ -81,10 +82,10 @@ public final class ProxySQLExecutor {
this.type = type;
this.backendConnection = backendConnection;
ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
- boolean isSerialExecute = backendConnection.isSerialExecute();
MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection.getConnectionSession(), databaseCommunicationEngine, new JDBCExecutor(executorEngine, isSerialExecute));
- rawExecutor = new RawExecutor(executorEngine, isSerialExecute, metaDataContexts.getMetaData().getProps(), ProxyContext.getInstance().getContextManager().getInstanceContext()
+ ConnectionContext connectionContext = backendConnection.getConnectionSession().getConnectionContext();
+ jdbcExecutor = new ProxyJDBCExecutor(type, backendConnection.getConnectionSession(), databaseCommunicationEngine, new JDBCExecutor(executorEngine, connectionContext));
+ rawExecutor = new RawExecutor(executorEngine, connectionContext, metaDataContexts.getMetaData().getProps(), ProxyContext.getInstance().getContextManager().getInstanceContext()
.getEventBusContext());
}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 8edb5996836..62c14f6c7bf 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -148,7 +148,7 @@ public final class JDBCDatabaseCommunicationEngine extends DatabaseCommunication
String schemaName = getQueryContext().getSqlStatementContext().getTablesContext().getSchemaName().orElseGet(() -> DatabaseTypeEngine.getDefaultSchemaName(databaseType, databaseName));
SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
federationExecutor = sqlFederationRule.getSQLFederationExecutor(databaseName, schemaName, metaDataContexts.getMetaData(), metaDataContexts.getShardingSphereData(),
- new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.isSerialExecute()),
+ new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), backendConnection.getConnectionSession().getConnectionContext()),
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext());
}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
index 05e2db78d4b..cb591e32f59 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/sql/PreviewHandler.java
@@ -145,7 +145,8 @@ public final class PreviewHandler extends SQLRULBackendHandler<PreviewStatement>
EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
SQLFederationRule sqlFederationRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
SQLFederationExecutor sqlFederationExecutor = sqlFederationRule.getSQLFederationExecutor(databaseName, schemaName,
- metaDataContexts.getMetaData(), metaDataContexts.getShardingSphereData(), new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), false), eventBusContext);
+ metaDataContexts.getMetaData(), metaDataContexts.getShardingSphereData(),
+ new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), getConnectionSession().getConnectionContext()), eventBusContext);
sqlFederationExecutor.executeQuery(prepareEngine, createPreviewFederationCallback(database.getProtocolType(), database.getResourceMetaData().getStorageTypes(), sqlStatement, eventBusContext),
context);
return context.getExecutionUnits();
diff --git a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
index fbcc45b4f24..3380f612f33 100644
--- a/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
+++ b/proxy/frontend/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLMultiStatementsHandler.java
@@ -77,7 +77,7 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
private final KernelProcessor kernelProcessor = new KernelProcessor();
- private final JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), false);
+ private final JDBCExecutor jdbcExecutor;
private final ConnectionSession connectionSession;
@@ -89,7 +89,8 @@ public final class MySQLMultiStatementsHandler implements ProxyBackendHandler {
private ExecutionContext anyExecutionContext;
- public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, final SQLStatement sqlStatementSample, final String sql) throws SQLException {
+ public MySQLMultiStatementsHandler(final ConnectionSession connectionSession, final SQLStatement sqlStatementSample, final String sql) {
+ jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionSession.getConnectionContext());
connectionSession.getBackendConnection().handleAutoCommit();
this.connectionSession = connectionSession;
this.sqlStatementSample = sqlStatementSample;
diff --git a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index 6df48210d0e..9a5ae65edee 100644
--- a/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++ b/proxy/frontend/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -92,6 +93,7 @@ public final class OpenGaussComBatchBindExecutorTest extends ProxyContextRestore
when(packet.getStatementId()).thenReturn("S_1");
when(packet.readParameterSets(anyList())).thenReturn(Collections.singletonList(Collections.singletonList(0)));
ConnectionSession connectionSession = mock(ConnectionSession.class);
+ when(connectionSession.getConnectionContext()).thenReturn(new ConnectionContext());
when(connectionSession.getDatabaseName()).thenReturn("db");
when(connectionSession.getConnectionId()).thenReturn(1);
JDBCBackendConnection backendConnection = mock(JDBCBackendConnection.class);
diff --git a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
index 568fcec9bea..640b1f41b5e 100644
--- a/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
+++ b/proxy/frontend/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutor.java
@@ -69,7 +69,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
private final KernelProcessor kernelProcessor = new KernelProcessor();
- private final JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), false);
+ private final JDBCExecutor jdbcExecutor;
private final ConnectionSession connectionSession;
@@ -84,6 +84,7 @@ public final class PostgreSQLBatchedStatementsExecutor {
private ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext;
public PostgreSQLBatchedStatementsExecutor(final ConnectionSession connectionSession, final PostgreSQLServerPreparedStatement preparedStatement, final List<List<Object>> parameterSets) {
+ jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionSession.getConnectionContext());
this.connectionSession = connectionSession;
metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
this.preparedStatement = preparedStatement;
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
index 2ecf5b492b2..1b12f8573b7 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQ
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -96,6 +97,7 @@ public final class PostgreSQLAggregatedBatchedStatementsCommandExecutorTest exte
when(globalRuleMetaData.getSingleRule(SQLTranslatorRule.class)).thenReturn(new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()));
ConnectionSession connectionSession = mock(ConnectionSession.class);
when(connectionSession.getDatabaseName()).thenReturn("db");
+ when(connectionSession.getConnectionContext()).thenReturn(new ConnectionContext());
when(connectionSession.getServerPreparedStatementRegistry()).thenReturn(new ServerPreparedStatementRegistry());
ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
when(database.getResourceMetaData().getAllInstanceDataSourceNames()).thenReturn(Collections.singletonList("ds_0"));
diff --git a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
index 7e3d2c7c945..cb1f34cbe17 100644
--- a/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
+++ b/proxy/frontend/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.ext
import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.extended.bind.PostgreSQLTypeUnspecifiedSQLParameter;
import org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.ConnectionContext;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -86,6 +87,7 @@ public final class PostgreSQLBatchedStatementsExecutorTest extends ProxyContextR
when(connectionSession.getDatabaseName()).thenReturn("db");
when(connectionSession.getBackendConnection()).thenReturn(backendConnection);
when(connectionSession.getStatementManager()).thenReturn(backendStatement);
+ when(connectionSession.getConnectionContext()).thenReturn(new ConnectionContext());
ProxyContext.init(contextManager);
when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)).thenReturn(1);
when(contextManager.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1);