You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/14 10:06:24 UTC
[shardingsphere] branch master updated: Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a437e238a52 Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)
a437e238a52 is described below
commit a437e238a529375a93b56ffbd1e3d7d0f4d32cb0
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Mon Nov 14 18:06:17 2022 +0800
Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)
* Automatically start a distributed transaction if needed in jdbc adapter(#20234)
* Adjust the code order
---
.../statement/ShardingSpherePreparedStatement.java | 72 +++++++++--
.../core/statement/ShardingSphereStatement.java | 133 ++++++++++++++-------
.../transaction/ConnectionTransaction.java | 1 +
3 files changed, 150 insertions(+), 56 deletions(-)
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9b8df4e199e..b0c6a9c1762 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -84,10 +84,14 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
+import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.ParameterMetaData;
@@ -335,10 +339,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
new RawSQLExecutorCallback(eventBusContext));
return accumulate(executeResults);
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return executor.getRegularExecutor().executeUpdate(executionGroupContext,
- executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
+ return isNeedImplicitCommitTransaction(executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -349,6 +350,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
}
}
+ private int useDriverToExecuteUpdate() throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return executor.getRegularExecutor().executeUpdate(executionGroupContext,
+ executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
+ }
+
private int accumulate(final Collection<ExecuteResult> results) {
int result = 0;
for (ExecuteResult each : results) {
@@ -401,10 +409,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
new RawSQLExecutorCallback(eventBusContext));
return executeResults.iterator().next() instanceof QueryResult;
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return executor.getRegularExecutor().execute(executionGroupContext,
- executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
+ return isNeedImplicitCommitTransaction(executionContext) ? executeWithImplicitCommitTransaction() : useDriverToExecute();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -430,9 +435,49 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
}
- private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
- return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+ private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+ ConnectionTransaction connectionTransaction = connection.getConnectionManager().getConnectionTransaction();
+ boolean isInTransaction = connection.getConnectionContext().getTransactionConnectionContext().isInTransaction();
+ SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+ return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction && sqlStatement instanceof DMLStatement
+ && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+ }
+
+ private boolean executeWithImplicitCommitTransaction() throws SQLException {
+ boolean result;
+ try {
+ connection.setAutoCommit(false);
+ result = useDriverToExecute();
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private int executeUpdateWithImplicitCommitTransaction() throws SQLException {
+ int result;
+ try {
+ connection.setAutoCommit(false);
+ result = useDriverToExecuteUpdate();
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private boolean useDriverToExecute() throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ return executor.getRegularExecutor().execute(executionGroupContext,
+ executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
}
private JDBCExecutorCallback<Boolean> createExecuteCallback() {
@@ -453,6 +498,11 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
};
}
+ private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+ return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+ }
+
@Override
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index faa68c222ee..ca2fad67d12 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -70,7 +70,6 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
@@ -78,10 +77,14 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
+import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -213,7 +216,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(),
new RawSQLExecutorCallback(eventBusContext)).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
cacheStatements(executionGroupContext.getInputGroups());
StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), executionContext.getSqlStatementContext().getSqlStatement(),
@@ -250,10 +253,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL),
- executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+ return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL), executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -281,10 +281,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys),
- executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+ return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys), executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -310,10 +307,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroups = createExecutionContext();
- cacheStatements(executionGroups.getInputGroups());
- return executeUpdate(executionGroups, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes),
- executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+ return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes), executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -339,10 +333,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames),
- executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+ return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames), executionContext.getSqlStatementContext());
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -353,10 +344,36 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
}
}
- private int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteUpdateCallback updater,
- final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
+ private int executeUpdate(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ return isNeedImplicitCommitTransaction(executionContext) ? executeUpdateWithImplicitCommitTransaction(updater, sqlStatementContext) : useDriverToExecuteUpdate(updater, sqlStatementContext);
+ }
+
+ private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ int result;
+ try {
+ connection.setAutoCommit(false);
+ result = useDriverToExecuteUpdate(updater, sqlStatementContext);
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ JDBCExecutorCallback<Integer> callback = createExecuteUpdateCallback(updater, sqlStatementContext);
+ return executor.getRegularExecutor().executeUpdate(executionGroupContext,
+ executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), callback);
+ }
+
+ private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) {
boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
eventBusContext) {
@@ -370,7 +387,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return Optional.empty();
}
};
- return executor.getRegularExecutor().executeUpdate(executionGroupContext, executionContext.getQueryContext(), routeUnits, callback);
}
private int accumulate(final Collection<ExecuteResult> results) {
@@ -434,25 +450,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
}
}
- private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteCallback executeCallback,
- final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
- boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
- JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
- metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
-
- @Override
- protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
- return executeCallback.execute(sql, statement);
- }
-
- @Override
- protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
- return executor.getRegularExecutor().execute(executionGroupContext, executionContext.getQueryContext(), routeUnits, jdbcExecutorCallback);
- }
-
private boolean execute0(final String sql, final ExecuteCallback callback) throws SQLException {
try {
QueryContext queryContext = createQueryContext(sql);
@@ -473,9 +470,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext));
return results.iterator().next() instanceof QueryResult;
}
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
- cacheStatements(executionGroupContext.getInputGroups());
- return execute(executionGroupContext, callback, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
+ return isNeedImplicitCommitTransaction(executionContext) ? executeWithImplicitCommitTransaction(callback) : useDriverToExecute(callback);
} finally {
currentResultSet = null;
}
@@ -525,7 +520,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
metaDataContexts.getMetaData().getProps(), connection.getConnectionContext());
}
- private ExecutionGroupContext<JDBCExecutionUnit> createExecutionContext() throws SQLException {
+ private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
}
@@ -536,6 +531,37 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
}
+ private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+ ConnectionTransaction connectionTransaction = connection.getConnectionManager().getConnectionTransaction();
+ boolean isInTransaction = connection.getConnectionContext().getTransactionConnectionContext().isInTransaction();
+ SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+ return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction && sqlStatement instanceof DMLStatement
+ && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+ }
+
+ private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callback) throws SQLException {
+ boolean result;
+ try {
+ connection.setAutoCommit(false);
+ result = useDriverToExecute(callback);
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+ }
+ return result;
+ }
+
+ private boolean useDriverToExecute(final ExecuteCallback callback) throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+ cacheStatements(executionGroupContext.getInputGroups());
+ JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(callback, executionContext.getSqlStatementContext().getSqlStatement());
+ return executor.getRegularExecutor().execute(executionGroupContext,
+ executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ }
+
private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
@@ -543,6 +569,23 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
replay();
}
+ private JDBCExecutorCallback<Boolean> createExecuteCallback(final ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
+ boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
+ return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+ metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
+
+ @Override
+ protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+ return executeCallback.execute(sql, statement);
+ }
+
+ @Override
+ protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
+ return Optional.empty();
+ }
+ };
+ }
+
private void replay() throws SQLException {
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);
diff --git a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
index 5d8fc653771..4e266d65e16 100644
--- a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
+++ b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -33,6 +33,7 @@ import java.util.Optional;
*/
public final class ConnectionTransaction {
+ @Getter
private final TransactionType transactionType;
private final String databaseName;