You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/14 10:06:24 UTC

[shardingsphere] branch master updated: Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a437e238a52 Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)
a437e238a52 is described below

commit a437e238a529375a93b56ffbd1e3d7d0f4d32cb0
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Mon Nov 14 18:06:17 2022 +0800

    Automatically start a distributed transaction in jdbc adapter(#20234) (#22146)
    
    * Automatically start a distributed transaction if needed in jdbc adapter(#20234)
    
    * Adjust the code order
---
 .../statement/ShardingSpherePreparedStatement.java |  72 +++++++++--
 .../core/statement/ShardingSphereStatement.java    | 133 ++++++++++++++-------
 .../transaction/ConnectionTransaction.java         |   1 +
 3 files changed, 150 insertions(+), 56 deletions(-)

diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9b8df4e199e..b0c6a9c1762 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -84,10 +84,14 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
 import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
+import org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
 import java.sql.ParameterMetaData;
@@ -335,10 +339,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
                         new RawSQLExecutorCallback(eventBusContext));
                 return accumulate(executeResults);
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return executor.getRegularExecutor().executeUpdate(executionGroupContext,
-                    executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
+            return isNeedImplicitCommitTransaction(executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate();
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -349,6 +350,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         }
     }
     
+    private int useDriverToExecuteUpdate() throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+        cacheStatements(executionGroupContext.getInputGroups());
+        return executor.getRegularExecutor().executeUpdate(executionGroupContext,
+                executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback());
+    }
+    
     private int accumulate(final Collection<ExecuteResult> results) {
         int result = 0;
         for (ExecuteResult each : results) {
@@ -401,10 +409,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
                         new RawSQLExecutorCallback(eventBusContext));
                 return executeResults.iterator().next() instanceof QueryResult;
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return executor.getRegularExecutor().execute(executionGroupContext,
-                    executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
+            return isNeedImplicitCommitTransaction(executionContext) ? executeWithImplicitCommitTransaction() : useDriverToExecute();
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -430,9 +435,49 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
                 .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
     }
     
-    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
-        return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+    private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+        ConnectionTransaction connectionTransaction = connection.getConnectionManager().getConnectionTransaction();
+        boolean isInTransaction = connection.getConnectionContext().getTransactionConnectionContext().isInTransaction();
+        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+        return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction && sqlStatement instanceof DMLStatement
+                && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+    }
+    
+    private boolean executeWithImplicitCommitTransaction() throws SQLException {
+        boolean result;
+        try {
+            connection.setAutoCommit(false);
+            result = useDriverToExecute();
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private int executeUpdateWithImplicitCommitTransaction() throws SQLException {
+        int result;
+        try {
+            connection.setAutoCommit(false);
+            result = useDriverToExecuteUpdate();
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private boolean useDriverToExecute() throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+        cacheStatements(executionGroupContext.getInputGroups());
+        return executor.getRegularExecutor().execute(executionGroupContext,
+                executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
     }
     
     private JDBCExecutorCallback<Boolean> createExecuteCallback() {
@@ -453,6 +498,11 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         };
     }
     
+    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+        return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+    }
+    
     @Override
     public ResultSet getResultSet() throws SQLException {
         if (null != currentResultSet) {
diff --git a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index faa68c222ee..ca2fad67d12 100644
--- a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -70,7 +70,6 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.route.context.RouteContext;
-import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
@@ -78,10 +77,14 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
 import org.apache.shardingsphere.sqlfederation.spi.SQLFederationExecutorContext;
 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
 import org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.ConnectionTransaction;
+import org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -213,7 +216,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             return executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(),
                     new RawSQLExecutorCallback(eventBusContext)).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
         }
-        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
         cacheStatements(executionGroupContext.getInputGroups());
         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
                 metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), executionContext.getSqlStatementContext().getSqlStatement(),
@@ -250,10 +253,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL),
-                    executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+            return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL), executionContext.getSqlStatementContext());
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -281,10 +281,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys),
-                    executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+            return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, autoGeneratedKeys), executionContext.getSqlStatementContext());
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -310,10 +307,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroups = createExecutionContext();
-            cacheStatements(executionGroups.getInputGroups());
-            return executeUpdate(executionGroups, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes),
-                    executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+            return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, columnIndexes), executionContext.getSqlStatementContext());
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -339,10 +333,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
             if (metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
                 return accumulate(executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext)));
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return executeUpdate(executionGroupContext, (actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames),
-                    executionContext.getSqlStatementContext(), executionContext.getRouteContext().getRouteUnits());
+            return executeUpdate((actualSQL, statement) -> statement.executeUpdate(actualSQL, columnNames), executionContext.getSqlStatementContext());
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
@@ -353,10 +344,36 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
     }
     
-    private int executeUpdate(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteUpdateCallback updater,
-                              final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) throws SQLException {
+    private int executeUpdate(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+        return isNeedImplicitCommitTransaction(executionContext) ? executeUpdateWithImplicitCommitTransaction(updater, sqlStatementContext) : useDriverToExecuteUpdate(updater, sqlStatementContext);
+    }
+    
+    private int executeUpdateWithImplicitCommitTransaction(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+        int result;
+        try {
+            connection.setAutoCommit(false);
+            result = useDriverToExecuteUpdate(updater, sqlStatementContext);
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private int useDriverToExecuteUpdate(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+        cacheStatements(executionGroupContext.getInputGroups());
+        JDBCExecutorCallback<Integer> callback = createExecuteUpdateCallback(updater, sqlStatementContext);
+        return executor.getRegularExecutor().executeUpdate(executionGroupContext,
+                executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), callback);
+    }
+    
+    private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final ExecuteUpdateCallback updater, final SQLStatementContext<?> sqlStatementContext) {
         boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        JDBCExecutorCallback<Integer> callback = new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+        return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
                 metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatementContext.getSqlStatement(), isExceptionThrown,
                 eventBusContext) {
             
@@ -370,7 +387,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 return Optional.empty();
             }
         };
-        return executor.getRegularExecutor().executeUpdate(executionGroupContext, executionContext.getQueryContext(), routeUnits, callback);
     }
     
     private int accumulate(final Collection<ExecuteResult> results) {
@@ -434,25 +450,6 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
     }
     
-    private boolean execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final ExecuteCallback executeCallback,
-                            final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
-        boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
-        JDBCExecutorCallback<Boolean> jdbcExecutorCallback = new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
-                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
-            
-            @Override
-            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
-                return executeCallback.execute(sql, statement);
-            }
-            
-            @Override
-            protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) {
-                return Optional.empty();
-            }
-        };
-        return executor.getRegularExecutor().execute(executionGroupContext, executionContext.getQueryContext(), routeUnits, jdbcExecutorCallback);
-    }
-    
     private boolean execute0(final String sql, final ExecuteCallback callback) throws SQLException {
         try {
             QueryContext queryContext = createQueryContext(sql);
@@ -473,9 +470,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 Collection<ExecuteResult> results = executor.getRawExecutor().execute(createRawExecutionContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback(eventBusContext));
                 return results.iterator().next() instanceof QueryResult;
             }
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionContext();
-            cacheStatements(executionGroupContext.getInputGroups());
-            return execute(executionGroupContext, callback, executionContext.getSqlStatementContext().getSqlStatement(), executionContext.getRouteContext().getRouteUnits());
+            return isNeedImplicitCommitTransaction(executionContext) ? executeWithImplicitCommitTransaction(callback) : useDriverToExecute(callback);
         } finally {
             currentResultSet = null;
         }
@@ -525,7 +520,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 metaDataContexts.getMetaData().getProps(), connection.getConnectionContext());
     }
     
-    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionContext() throws SQLException {
+    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
     }
@@ -536,6 +531,37 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
                 .prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
     }
     
+    private boolean isNeedImplicitCommitTransaction(final ExecutionContext executionContext) {
+        ConnectionTransaction connectionTransaction = connection.getConnectionManager().getConnectionTransaction();
+        boolean isInTransaction = connection.getConnectionContext().getTransactionConnectionContext().isInTransaction();
+        SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
+        return TransactionType.isDistributedTransaction(connectionTransaction.getTransactionType()) && !isInTransaction && sqlStatement instanceof DMLStatement
+                && !(sqlStatement instanceof SelectStatement) && executionContext.getExecutionUnits().size() > 1;
+    }
+    
+    private boolean executeWithImplicitCommitTransaction(final ExecuteCallback callback) throws SQLException {
+        boolean result;
+        try {
+            connection.setAutoCommit(false);
+            result = useDriverToExecute(callback);
+            connection.commit();
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            connection.rollback();
+            throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType());
+        }
+        return result;
+    }
+    
+    private boolean useDriverToExecute(final ExecuteCallback callback) throws SQLException {
+        ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext();
+        cacheStatements(executionGroupContext.getInputGroups());
+        JDBCExecutorCallback<Boolean> jdbcExecutorCallback = createExecuteCallback(callback, executionContext.getSqlStatementContext().getSqlStatement());
+        return executor.getRegularExecutor().execute(executionGroupContext,
+                executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+    }
+    
     private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException {
         for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
             statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
@@ -543,6 +569,23 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         replay();
     }
     
+    private JDBCExecutorCallback<Boolean> createExecuteCallback(final ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
+        boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown();
+        return new JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(),
+                metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown, eventBusContext) {
+            
+            @Override
+            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException {
+                return executeCallback.execute(sql, statement);
+            }
+            
+            @Override
+            protected Optional<Boolean> getSaneResult(final SQLStatement sqlStatement1, final SQLException ex) {
+                return Optional.empty();
+            }
+        };
+    }
+    
     private void replay() throws SQLException {
         for (Statement each : statements) {
             getMethodInvocationRecorder().replay(each);
diff --git a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
index 5d8fc653771..4e266d65e16 100644
--- a/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
+++ b/kernel/transaction/core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java
@@ -33,6 +33,7 @@ import java.util.Optional;
  */
 public final class ConnectionTransaction {
     
+    @Getter
     private final TransactionType transactionType;
     
     private final String databaseName;