You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/06/09 09:24:35 UTC

[shardingsphere] branch master updated: optimize calcite parameter pass (#10731)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba3f1d  optimize calcite parameter pass (#10731)
7ba3f1d is described below

commit 7ba3f1dd7ed39dce8ae00521f9a9b484e359744a
Author: Zhengqiang Duan <st...@gmail.com>
AuthorDate: Wed Jun 9 17:24:02 2021 +0800

    optimize calcite parameter pass (#10731)
---
 .../sql/federate/execute/FederateExecutor.java     | 10 ++++---
 .../sql/federate/execute/FederateJDBCExecutor.java | 33 ++++++++--------------
 .../federate/execute/raw/FederateRawExecutor.java  |  6 ++--
 .../federate/schema/row/FederateRowExecutor.java   | 23 ++-------------
 .../statement/ShardingSpherePreparedStatement.java | 15 ++++++----
 .../core/statement/ShardingSphereStatement.java    | 15 ++++++----
 .../backend/communication/ProxySQLExecutor.java    | 14 ++++++---
 7 files changed, 52 insertions(+), 64 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
index f646d34..6991346 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateExecutor.java
@@ -18,11 +18,13 @@
 package org.apache.shardingsphere.infra.executor.sql.federate.execute;
 
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
@@ -37,12 +39,12 @@ public interface FederateExecutor {
      *
      * @param executionContext execution context
      * @param callback callback
-     * @param type JDBC driver type
-     * @param statementOption statement option
+     * @param prepareEngine prepare engine
      * @return execute result
      * @throws SQLException SQL exception
      */
-    List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback, String type, StatementOption statementOption) throws SQLException;
+    List<QueryResult> executeQuery(ExecutionContext executionContext, JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                                   DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException;
     
     /**
      * Close.
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
index 84eed4d..9905ce3 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/FederateJDBCExecutor.java
@@ -22,6 +22,7 @@ import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -29,10 +30,8 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.FederateLogicSchema;
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.row.FederateRowExecutor;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.sql.parser.sql.common.util.SQLUtil;
 
 import java.sql.Connection;
@@ -41,7 +40,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -59,12 +57,8 @@ public final class FederateJDBCExecutor implements FederateExecutor {
     
     private final OptimizeContextFactory factory;
     
-    private final Collection<ShardingSphereRule> rules;
-    
     private final ConfigurationProperties props;
     
-    private final ExecutorJDBCManager jdbcManager;
-    
     private final JDBCExecutor jdbcExecutor;
     
     private Statement statement;
@@ -77,20 +71,17 @@ public final class FederateJDBCExecutor implements FederateExecutor {
         }
     }
     
-    public FederateJDBCExecutor(final String schema, final OptimizeContextFactory factory, final Collection<ShardingSphereRule> rules, 
-                                final ConfigurationProperties props, final ExecutorJDBCManager jdbcManager, final JDBCExecutor jdbcExecutor) {
+    public FederateJDBCExecutor(final String schema, final OptimizeContextFactory factory, final ConfigurationProperties props, final JDBCExecutor jdbcExecutor) {
         this.schema = schema;
         this.factory = factory;
-        this.rules = rules;
         this.props = props;
-        this.jdbcManager = jdbcManager;
         this.jdbcExecutor = jdbcExecutor;
     }
     
     @Override
     public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
-                                          final String type, final StatementOption statementOption) throws SQLException {
-        QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback, type, statementOption));
+                                          final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
+        QueryResult result = new JDBCStreamQueryResult(execute(executionContext, callback, prepareEngine));
         return Collections.singletonList(result);
     }
     
@@ -109,19 +100,19 @@ public final class FederateJDBCExecutor implements FederateExecutor {
     }
     
     private ResultSet execute(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
-                              final String type, final StatementOption statementOption) throws SQLException {
+                              final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
         SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
-        PreparedStatement statement = getConnection(executionContext, callback, type, statementOption).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
+        PreparedStatement statement = getConnection(executionContext, callback, prepareEngine).prepareStatement(SQLUtil.trimSemicolon(sqlUnit.getSql()));
         setParameters(statement, sqlUnit.getParameters());
         this.statement = statement;
         return statement.executeQuery();
     }
     
     private Connection getConnection(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
-                                     final String type, final StatementOption statementOption) throws SQLException {
+                                     final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
         Connection result = DriverManager.getConnection(CONNECTION_URL, getProperties());
         CalciteConnection calciteConnection = result.unwrap(CalciteConnection.class);
-        addSchema(calciteConnection, executionContext, callback, type, statementOption);
+        addSchema(calciteConnection, executionContext, callback, prepareEngine);
         return result;
     }
     
@@ -132,9 +123,9 @@ public final class FederateJDBCExecutor implements FederateExecutor {
         return result;
     }
     
-    private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext, 
-                           final JDBCExecutorCallback<? extends ExecuteResult> callback, final String type, final StatementOption statementOption) throws SQLException {
-        FederateRowExecutor executor = new FederateRowExecutor(rules, props, jdbcManager, jdbcExecutor, executionContext, callback, type, statementOption);
+    private void addSchema(final CalciteConnection calciteConnection, final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
+                           final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
+        FederateRowExecutor executor = new FederateRowExecutor(props, jdbcExecutor, executionContext, callback, prepareEngine);
         FederateLogicSchema logicSchema = new FederateLogicSchema(factory.getSchemaMetadatas().getSchemaMetadataBySchemaName(schema), executor);
         calciteConnection.getRootSchema().add(schema, logicSchema);
         calciteConnection.setSchema(schema);
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
index 33721cd..690db51 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/execute/raw/FederateRawExecutor.java
@@ -25,14 +25,16 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.federate.execute.FederateExecutor;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.optimize.ShardingSphereOptimizer;
 import org.apache.shardingsphere.infra.optimize.context.OptimizeContext;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collections;
@@ -52,7 +54,7 @@ public final class FederateRawExecutor implements FederateExecutor {
     
     @Override
     public List<QueryResult> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<? extends ExecuteResult> callback, 
-                                          final String type, final StatementOption statementOption) throws SQLException {
+                                          final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) throws SQLException {
         // TODO
         return Collections.emptyList();
     }
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
index efd50ef..81f3942 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/federate/schema/row/FederateRowExecutor.java
@@ -21,11 +21,9 @@ import lombok.RequiredArgsConstructor;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.rex.RexNode;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
@@ -34,10 +32,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryRe
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.generator.FederateExecutionContextGenerator;
 import org.apache.shardingsphere.infra.executor.sql.federate.schema.table.generator.FederateExecutionSQLGenerator;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -51,22 +46,15 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public final class FederateRowExecutor {
     
-    // TODO Consider use emptyList
-    private final Collection<ShardingSphereRule> rules;
-    
     private final ConfigurationProperties props;
     
-    private final ExecutorJDBCManager jdbcManager;
-    
     private final JDBCExecutor jdbcExecutor;
     
     private final ExecutionContext routeExecutionContext;
     
     private final JDBCExecutorCallback<? extends ExecuteResult> callback;
     
-    private final String type;
-    
-    private final StatementOption statementOption;
+    private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine;
     
     /**
      * Execute.
@@ -84,7 +72,7 @@ public final class FederateRowExecutor {
     
     private Collection<QueryResult> execute(final ExecutionContext context) {
         try {
-            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(context);
+            ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits());
             ExecuteProcessEngine.initialize(context.getSqlStatementContext(), executionGroupContext, props);
             Collection<QueryResult> result = jdbcExecutor.execute(executionGroupContext, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
             ExecuteProcessEngine.finish(executionGroupContext.getExecutionID());
@@ -95,11 +83,4 @@ public final class FederateRowExecutor {
             ExecuteProcessEngine.clean();
         }
     }
-    
-    private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext(final ExecutionContext executionContext) throws SQLException {
-        int maxConnectionsSizePerQuery = props.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        Collection<ExecutionUnit> executionUnits = executionContext.getExecutionUnits();
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(type, maxConnectionsSizePerQuery, jdbcManager, statementOption, rules);
-        return prepareEngine.prepare(executionContext.getRouteContext(), executionUnits);
-    }
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index ad98836..12d0f8b 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -165,8 +165,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts, jdbcExecutor);
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
         // TODO Consider FederateRawExecutor
-        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
-                metaDataContexts.getProps(), connection, jdbcExecutor);
+        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
         batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor);
         kernelProcessor = new KernelProcessor();
     }
@@ -214,7 +213,13 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         }
         PreparedStatementExecuteQueryCallback callback = new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), 
                  sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.PREPARED_STATEMENT, statementOption);
+        return federateExecutor.executeQuery(executionContext, callback, createDriverExecutionPrepareEngine());
+    }
+    
+    private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
+        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, 
+                statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
     }
     
     @Override
@@ -285,9 +290,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     }
     
     private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException {
-        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
-                JDBCDriverType.PREPARED_STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
     }
     
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 396ca52..aa14e2d 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -134,8 +134,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         driverJDBCExecutor = new DriverJDBCExecutor(metaDataContexts, jdbcExecutor);
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
         // TODO Consider FederateRawExecutor
-        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
-                metaDataContexts.getProps(), connection, jdbcExecutor);
+        federateExecutor = new FederateJDBCExecutor(DefaultSchema.LOGIC_NAME, metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
         kernelProcessor = new KernelProcessor();
     }
     
@@ -185,7 +184,13 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
                 executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
-        return federateExecutor.executeQuery(executionContext, callback, JDBCDriverType.STATEMENT, statementOption);
+        return federateExecutor.executeQuery(executionContext, callback, createDriverExecutionPrepareEngine());
+    }
+    
+    private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine() {
+        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection, 
+                statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
     }
     
     @Override
@@ -405,9 +410,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     }
     
     private ExecutionGroupContext<JDBCExecutionUnit> createExecutionContext() throws SQLException {
-        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
-                JDBCDriverType.STATEMENT, maxConnectionsSizePerQuery, connection, statementOption, metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules());
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 68fbf60..fba5049 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -82,9 +82,8 @@ public final class ProxySQLExecutor {
         MetaDataContexts metaDataContexts = ProxyContext.getInstance().getMetaDataContexts();
         rawExecutor = new RawExecutor(executorEngine, isSerialExecute, metaDataContexts.getProps());
         // TODO Consider FederateRawExecutor
-        federateExecutor = new FederateJDBCExecutor(backendConnection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), 
-                metaDataContexts.getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules(), 
-                metaDataContexts.getProps(), backendConnection, new JDBCExecutor(executorEngine, isSerialExecute));
+        federateExecutor = new FederateJDBCExecutor(backendConnection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(),
+                metaDataContexts.getProps(), new JDBCExecutor(executorEngine, isSerialExecute));
     }
     
     /**
@@ -148,7 +147,14 @@ public final class ProxySQLExecutor {
         ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(), 
                 executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isReturnGeneratedKeys, isExceptionThrown, true);
         backendConnection.setFederateExecutor(federateExecutor);
-        return federateExecutor.executeQuery(executionContext, callback, type, new StatementOption(isReturnGeneratedKeys)).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, metaData);
+        return federateExecutor.executeQuery(executionContext, callback, prepareEngine).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+    }
+    
+    private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> createDriverExecutionPrepareEngine(final boolean isReturnGeneratedKeys, final MetaDataContexts metaData) {
+        int maxConnectionsSizePerQuery = metaData.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        return new DriverExecutionPrepareEngine<>(type, maxConnectionsSizePerQuery, backendConnection, new StatementOption(isReturnGeneratedKeys), 
+                metaData.getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules());
     }
     
     private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,