You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by wu...@apache.org on 2021/01/03 12:46:10 UTC
[shardingsphere] branch master updated: create and use
CalciteExecutor in Statements (#8866)
This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f1c9bae create and use CalciteExecutor in Statements (#8866)
f1c9bae is described below
commit f1c9baeb70fdcf798ef55d74046a48f6bc4d1dd4
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Sun Jan 3 20:45:49 2021 +0800
create and use CalciteExecutor in Statements (#8866)
* create and use CalciteExecutor
* null == executor?
* add fun
---
.../infra/optimize/execute/CalciteExecutor.java | 46 ++++++++++++++++++++++
.../optimize/execute/CalciteJDBCExecutor.java | 25 ++++++------
.../infra/optimize/execute/CalciteRawExecutor.java | 23 +++++------
.../driver/executor/DriverJDBCExecutor.java | 2 +
.../jdbc/adapter/AbstractStatementAdapter.java | 11 ++++++
.../statement/ShardingSpherePreparedStatement.java | 33 ++++++++++++++++
.../core/statement/ShardingSphereStatement.java | 31 +++++++++++++++
.../statement/CircuitBreakerPreparedStatement.java | 6 +++
8 files changed, 153 insertions(+), 24 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java
new file mode 100644
index 0000000..6e17098
--- /dev/null
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteExecutor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.optimize.execute;
+
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Calcite executor.
+ */
+public interface CalciteExecutor {
+
+ /**
+ * Execute query.
+ *
+ * @param sql sql
+ * @param parameters parameters
+ * @return execute result
+ * @throws SQLException SQL exception
+ */
+ List<QueryResult> executeQuery(String sql, List<Object> parameters) throws SQLException;
+
+ /**
+ * Close.
+ *
+ * @throws SQLException SQL exception
+ */
+ void close() throws SQLException;
+}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
index e81f3ff..5e8b0e5 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteJDBCExecutor.java
@@ -29,7 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collection;
+import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -37,7 +37,7 @@ import java.util.Properties;
/**
* Calcite JDBC executor.
*/
-public final class CalciteJDBCExecutor {
+public final class CalciteJDBCExecutor implements CalciteExecutor {
public static final String CONNECTION_URL = "jdbc:calcite:";
@@ -47,6 +47,8 @@ public final class CalciteJDBCExecutor {
private final CalciteContext context;
+ private Statement statement;
+
static {
try {
Class.forName(DRIVER_NAME);
@@ -61,22 +63,23 @@ public final class CalciteJDBCExecutor {
PROPERTIES.setProperty(CalciteConnectionProperty.CONFORMANCE.camelName(), context.getConnectionProperties().getProperty(CalciteConnectionProperty.CONFORMANCE.camelName()));
}
- /**
- * Execute query.
- *
- * @param sql sql
- * @param parameters parameters
- * @return execute result
- * @throws SQLException SQL exception
- */
- public Collection<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+ @Override
+ public List<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
QueryResult result = new JDBCStreamQueryResult(execute(sql, parameters));
return Collections.singletonList(result);
}
+ @Override
+ public void close() throws SQLException {
+ Connection connection = statement.getConnection();
+ connection.close();
+ statement.close();
+ }
+
private ResultSet execute(final String sql, final List<Object> parameters) throws SQLException {
PreparedStatement statement = getConnection().prepareStatement(sql);
setParameters(statement, parameters);
+ this.statement = statement;
return statement.executeQuery();
}
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
index e019a78..3b6fa78 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteRawExecutor.java
@@ -29,8 +29,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.optimize.context.CalciteContext;
import org.apache.shardingsphere.infra.optimize.context.CalciteDataContext;
@@ -42,23 +41,21 @@ import java.util.List;
* Calcite raw executor.
*/
@RequiredArgsConstructor
-public final class CalciteRawExecutor {
+public final class CalciteRawExecutor implements CalciteExecutor {
private final CalciteContext context;
- /**
- * Execute query.
- *
- * @param executionContext execution context
- * @param callback JDBC execute callback
- * @param <T> class type of return value
- * @return execute result
- * @throws SQLException SQL exception
- */
- public <T> List<T> executeQuery(final ExecutionContext executionContext, final JDBCExecutorCallback<T> callback) throws SQLException {
+ @Override
+ public List<QueryResult> executeQuery(final String sql, final List<Object> parameters) throws SQLException {
+ // TODO
return Collections.emptyList();
}
+ @Override
+ public void close() throws SQLException {
+ // TODO
+ }
+
private Enumerable<Object[]> execute(final String sql) throws SqlParseException {
// TODO The below will be replaced by SqlNodeConverter.
SqlNode sqlNode = SqlParser.create(sql, context.getParserConfig()).parseQuery();
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 1bab80e..1a8db30 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.driver.executor;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
@@ -67,6 +68,7 @@ public final class DriverJDBCExecutor {
private final MetaDataContexts metaDataContexts;
+ @Getter
private final JDBCExecutor jdbcExecutor;
/**
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 19960c0..b0f9b83 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.driver.jdbc.adapter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
import java.sql.SQLException;
import java.sql.SQLWarning;
@@ -50,11 +51,19 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
closed = true;
try {
forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
+ closeCalciteExecutor();
} finally {
getRoutedStatements().clear();
}
}
+ private void closeCalciteExecutor() throws SQLException {
+ CalciteExecutor executor = getCalciteExecutor();
+ if (null != executor) {
+ executor.close();
+ }
+ }
+
@Override
public final boolean isClosed() {
return closed;
@@ -203,4 +212,6 @@ public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperat
protected abstract boolean isAccumulate();
protected abstract Collection<? extends Statement> getRoutedStatements();
+
+ protected abstract CalciteExecutor getCalciteExecutor();
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 6c7d6a3..aef17db 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
+import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.driver.jdbc.core.constant.SQLExceptionConstant;
@@ -38,10 +40,12 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -62,6 +66,9 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
@@ -77,6 +84,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
@@ -119,6 +127,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
private ResultSet currentResultSet;
+ @Getter(AccessLevel.PROTECTED)
+ private CalciteExecutor calciteExecutor;
+
public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
@@ -178,12 +189,34 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
}
+ if (executionContext.getRouteContext().isToCalcite()) {
+ return executeQueryByCalcite();
+ }
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
return driverJDBCExecutor.executeQuery(executionGroups,
new PreparedStatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(), sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown()));
}
+ private List<QueryResult> executeQueryByCalcite() throws SQLException {
+ if (executionContext.getExecutionUnits().isEmpty()) {
+ return Collections.emptyList();
+ }
+ calciteExecutor = createCalciteExecutor();
+ SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+ return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters());
+ }
+
+ private CalciteExecutor createCalciteExecutor() {
+ int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+ executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
+ CalciteInternalExecutor executor = new CalciteInternalExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+ maxConnectionsSizePerQuery, connection, driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
+ // TODO Consider CalciteRawExecutor
+ return new CalciteJDBCExecutor(metaDataContexts.getCalciteContextFactory().create(DefaultSchema.LOGIC_NAME, executor));
+ }
+
@Override
public int executeUpdate() throws SQLException {
try {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 905c274..6d50251 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.driver.jdbc.core.statement;
import com.google.common.base.Strings;
+import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverJDBCExecutor;
import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
@@ -37,10 +38,12 @@ import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementConte
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -61,6 +64,9 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
@@ -107,6 +113,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
private ResultSet currentResultSet;
+ @Getter(AccessLevel.PROTECTED)
+ private CalciteExecutor calciteExecutor;
+
public ShardingSphereStatement(final ShardingSphereConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
@@ -149,6 +158,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
return rawExecutor.execute(createRawExecutionGroups(), new RawSQLExecutorCallback()).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
}
+ if (executionContext.getRouteContext().isToCalcite()) {
+ return executeQueryByCalcite();
+ }
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups();
cacheStatements(executionGroups);
StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
@@ -156,6 +168,25 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return driverJDBCExecutor.executeQuery(executionGroups, callback);
}
+ private List<QueryResult> executeQueryByCalcite() throws SQLException {
+ if (executionContext.getExecutionUnits().isEmpty()) {
+ return Collections.emptyList();
+ }
+ calciteExecutor = createCalciteExecutor();
+ SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+ return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters());
+ }
+
+ private CalciteExecutor createCalciteExecutor() {
+ int maxConnectionsSizePerQuery = metaDataContexts.getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ StatementExecuteQueryCallback callback = new StatementExecuteQueryCallback(metaDataContexts.getDefaultMetaData().getResource().getDatabaseType(),
+ executionContext.getSqlStatementContext().getSqlStatement(), SQLExecutorExceptionHandler.isExceptionThrown());
+ CalciteInternalExecutor executor = new CalciteInternalExecutor(metaDataContexts.getDefaultMetaData().getRuleMetaData().getRules(),
+ maxConnectionsSizePerQuery, connection, driverJDBCExecutor.getJdbcExecutor(), executionContext, callback);
+ // TODO Consider CalciteRawExecutor
+ return new CalciteJDBCExecutor(metaDataContexts.getCalciteContextFactory().create(DefaultSchema.LOGIC_NAME, executor));
+ }
+
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
index e8c2976..2eb34e1 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-governance/src/main/java/org/apache/shardingsphere/driver/governance/internal/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement;
import org.apache.shardingsphere.driver.governance.internal.circuit.connection.CircuitBreakerConnection;
import org.apache.shardingsphere.driver.governance.internal.circuit.resultset.CircuitBreakerResultSet;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
import java.io.InputStream;
import java.io.Reader;
@@ -270,6 +271,11 @@ public final class CircuitBreakerPreparedStatement extends AbstractUnsupportedOp
}
@Override
+ protected CalciteExecutor getCalciteExecutor() {
+ return null;
+ }
+
+ @Override
public ResultSet executeQuery() {
return new CircuitBreakerResultSet();
}