You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/01/03 12:46:10 UTC

[shardingsphere] branch master updated: create and use CalciteExecutor in Statements (#8866)

This is an automated email from the ASF dual-hosted git repository.

wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c9bae  create and use CalciteExecutor in Statements (#8866)
f1c9bae is described below

commit f1c9baeb70fdcf798ef55d74046a48f6bc4d1dd4
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Sun Jan 3 20:45:49 2021 +0800

    create and use CalciteExecutor in Statements (#8866)
    
    * create and use CalciteExecutor
    
    * null == executor?
    
    * add fun
---
 .../infra/optimize/execute/CalciteExecutor.java    | 46 ++++++++++++++++++++++
 .../optimize/execute/CalciteJDBCExecutor.java      | 25 ++++++------
 .../infra/optimize/execute/CalciteRawExecutor.java | 23 +++++------
 .../driver/executor/DriverJDBCExecutor.java        |  2 +
 .../jdbc/adapter/AbstractStatementAdapter.java     | 11 ++++++
 .../statement/ShardingSpherePreparedStatement.java | 33 ++++++++++++++++
 .../core/statement/ShardingSphereStatement.java    | 31 +++++++++++++++
 .../statement/CircuitBreakerPreparedStatement.java |  6 +++
 8 files changed, 153 insertions(+), 24 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java
new file mode 100644
index 0000000..6e17098
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.execute;
+
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Calcite executor.
+ */
+public interface CalciteExecutor {
+    
+    /**
+     * Execute query.
+     *
+     * @param sql sql
+     * @param parameters parameters
+     * @return execute result
+     * @throws SQLException SQL exception
+     */
+    List<QueryResult> executeQuery(String sql, List<Object> parameters) throws SQLException;
+    
+    /**
+     * Close.
+     * 
+     * @throws SQLException SQL exception
+     */
+    void close() throws SQLException;
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
index e81f3ff..5e8b0e5 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
@@ -29,7 +29,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Collection;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -37,7 +37,7 @@ import java.util.Properties;
 /**
  * Calcite JDBC executor.
  */
-public final class CalciteJDBCExecutor {
+public final class CalciteJDBCExecutor implements CalciteExecutor {
     
     public static final String CONNECTION_URL = "jdbc:calcite:";
     
@@ -47,6 +47,8 @@ public final class CalciteJDBCExecutor {
     
     private final CalciteContext context;
     
+    private Statement statement;
+    
     static {
         try {
             Class.forName(DRIVER_NAME);
@@ -61,22 +63,23 @@ public final class CalciteJDBCExecutor {
         PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
     }
     
-    /**
-     * Execute query.
-     *
-     * @param sql sql
-     * @param parameters parameters
-     * @return execute result
-     * @throws SQLException SQL exception
-     */
-    public Collection<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+    @Override
+    public List<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
         QueryResult result = new JDBCStreamQueryResult(execute(sql, parameters));
         return Collections.singletonList(result);
     }
     
+    @Override
+    public void close() throws SQLException {
+        Connection connection = statement.getConnection();
+        connection.close();
+        statement.close();
+    }
+    
     private ResultSet execute(final String sql, final List<Object> parameters) throws SQLException {
         PreparedStatement statement = getConnection().prepareStatement(sql);
         setParameters(statement, parameters);
+        this.statement = statement;
         return statement.executeQuery();
     }
     
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
index e019a78..3b6fa78 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
@@ -29,8 +29,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.optimize.context.CalciteContext;
 import org.apache.shardingsphere.infra.optimize.context.CalciteDataContext;
 
@@ -42,23 +41,21 @@ import java.util.List;
  * Calcite raw executor.
  */
 @RequiredArgsConstructor
-public final class CalciteRawExecutor {
+public final class CalciteRawExecutor implements CalciteExecutor {
     
     private final CalciteContext context;
     
-    /**
-     * Execute query.
-     *
-     * @param executionContext execution context
-     * @param callback JDBC execute callback
-     * @param <T> class type of return value
-     * @return execute result
-     * @throws SQLException SQL exception
-     */
-    public <T> List<T> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<T> callback) throws SQLException {
+    @Override
+    public List<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+        // TODO
         return Collections.emptyList();
     }
     
+    @Override
+    public void close() throws SQLException {
+        // TODO
+    }
+    
     private Enumerable<Object[]> execute(final String sql) throws SqlParseException {
         // TODO The below will be replaced by SqlNodeConverter.
         SqlNode sqlNode = SqlParser.create(sql, context.getParserConfig()).parseQuery();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 1bab80e..1a8db30 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.driver.executor;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -67,6 +68,7 @@ public final class DriverJDBCExecutor {
     
     private final MetaDataContexts metaDataContexts;
     
+    @Getter
     private final JDBCExecutor jdbcExecutor;
     
     /**
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 19960c0..b0f9b83 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.adapter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
 import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
 
 import java.sql.SQLException;
 import java.sql.SQLWarning;
@@ -50,11 +51,19 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
         closed = true;
         try {
             forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
+            closeCalciteExecutor();
         } finally {
             getRoutedStatements().clear();
         }
     }
     
+    private void closeCalciteExecutor() throws SQLException {
+        CalciteExecutor executor = getCalciteExecutor();
+        if (null != executor) {
+            executor.close();
+        }
+    }
+    
     @Override
     public final boolean isClosed() {
         return closed;
@@ -203,4 +212,6 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
     protected abstract boolean isAccumulate();
     
     protected abstract Collection<? extends Statement> getRoutedStatements();
+    
+    protected abstract CalciteExecutor getCalciteExecutor();
 }
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 6c7d6a3..aef17db 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -18,11 +18,13 @@
 package org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import com.google.common.base.Strings;
+import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
 import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
 import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
 import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
 import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.driver.jdbc.core.constant.SQLExceptionConstant;
@@ -38,10 +40,12 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -62,6 +66,9 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
 import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
@@ -77,6 +84,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
@@ -119,6 +127,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
     
     private ResultSet currentResultSet;
     
+    @Getter(AccessLevel.PROTECTED)
+    private CalciteExecutor calciteExecutor;
+    
     public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
         this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
     }
@@ -178,12 +189,34 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
             return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
         }
+        if (executionContext.getRouteContext().isToCalcite()) {
+            return executeQueryByCalcite();
+        }
         Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
         cacheStatements(executionGroups);
         return driverJDBCExecutor.executeQuery(executionGroups, 
                 new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()));
     }
     
+    private List<QueryResult> executeQueryByCalcite() throws SQLException {
+        if (executionContext.getExecutionUnits().isEmpty()) {
+            return Collections.emptyList();
+        }
+        calciteExecutor = createCalciteExecutor();
+        SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+        return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters());
+    }
+    
+    private CalciteExecutor createCalciteExecutor() {
+        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+                executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
+        CalciteInternalExecutor executor = new CalciteInternalExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+                maxConnectionsSizePerQuery, connection, driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
+        // TODO Consider CalciteRawExecutor
+        return new CalciteJDBCExecutor(metaDataContexts.getCalciteContextFactory().create(DefaultSchema.LOGIC_NAME, executor));
+    }
+    
     @Override
     public int executeUpdate() throws SQLException {
         try {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 905c274..6d50251 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.driver.jdbc.core.statement;
 
 import com.google.common.base.Strings;
+import lombok.AccessLevel;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
 import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
@@ -37,10 +38,12 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -61,6 +64,9 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
 import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
 import org.apache.shardingsphere.infra.route.context.RouteUnit;
 import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
@@ -107,6 +113,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
     
     private ResultSet currentResultSet;
     
+    @Getter(AccessLevel.PROTECTED)
+    private CalciteExecutor calciteExecutor;
+    
     public ShardingSphereStatement(final ShardingSphereConnection connection) {
         this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
     }
@@ -149,6 +158,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
             return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
         }
+        if (executionContext.getRouteContext().isToCalcite()) {
+            return executeQueryByCalcite();
+        }
         Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
         cacheStatements(executionGroups);
         StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
@@ -156,6 +168,25 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         return driverJDBCExecutor.executeQuery(executionGroups, callback);
     }
     
+    private List<QueryResult> executeQueryByCalcite() throws SQLException {
+        if (executionContext.getExecutionUnits().isEmpty()) {
+            return Collections.emptyList();
+        }
+        calciteExecutor = createCalciteExecutor();
+        SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+        return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters());
+    }
+    
+    private CalciteExecutor createCalciteExecutor() {
+        int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+                executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
+        CalciteInternalExecutor executor = new CalciteInternalExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+                maxConnectionsSizePerQuery, connection, driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
+        // TODO Consider CalciteRawExecutor
+        return new CalciteJDBCExecutor(metaDataContexts.getCalciteContextFactory().create(DefaultSchema.LOGIC_NAME, executor));
+    }
+    
     @Override
     public int executeUpdate(final String sql) throws SQLException {
         try {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
index e8c2976..2eb34e1 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
 import org.apache.shardingsphere.driver.governance.internal.circuit.connection.CircuitBreakerConnection;
 import org.apache.shardingsphere.driver.governance.internal.circuit.resultset.CircuitBreakerResultSet;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
 
 import java.io.InputStream;
 import java.io.Reader;
@@ -270,6 +271,11 @@ public final class CircuitBreakerPreparedStatement extends AbstractUnsupportedOp
     }
     
     @Override
+    protected CalciteExecutor getCalciteExecutor() {
+        return null;
+    }
+    
+    @Override
     public ResultSet executeQuery() {
         return new CircuitBreakerResultSet();
     }