You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2020/11/29 14:02:21 UTC
[shardingsphere] branch master updated: Refactor ProxyRawExecutor
(#8410)
This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ff61094 Refactor ProxyRawExecutor (#8410)
ff61094 is described below
commit ff6109453c06fe8d9e01a5d150243936931bf540
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 29 22:01:50 2020 +0800
Refactor ProxyRawExecutor (#8410)
* Add processExecuteQuery and processExecuteUpdate for ProxySQLExecutor
* Refactor RawSQLExecutorCallback
* Refactor ProxySQLExecutor
* Move ProxyRawExecutor to raw package
---
.../raw/callback/RawSQLExecutorCallback.java | 14 ++--
.../backend/communication/ProxySQLExecutor.java | 95 ++++++++++++----------
.../ProxyRawExecutor.java} | 14 ++--
3 files changed, 64 insertions(+), 59 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 6979441..b916303 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback;
+import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -38,17 +38,17 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
ShardingSphereServiceLoader.register(RawExecutorCallback.class);
}
- private final Collection<RawExecutorCallback> rawExecutorCallbacks;
+ @SuppressWarnings("rawtypes")
+ private final Collection<RawExecutorCallback> callbacks;
public RawSQLExecutorCallback() {
- rawExecutorCallbacks = ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
- if (null == rawExecutorCallbacks || rawExecutorCallbacks.isEmpty()) {
- throw new ShardingSphereException("not found raw executor callback impl");
- }
+ callbacks = ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
+ Preconditions.checkState(!callbacks.isEmpty(), "No raw executor callback implementation found.");
}
+ @SuppressWarnings("unchecked")
@Override
public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
- return rawExecutorCallbacks.iterator().next().execute(inputs, isTrunkThread, dataMap);
+ return callbacks.iterator().next().execute(inputs, isTrunkThread, dataMap);
}
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 58f2721..90e0dfc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -42,7 +42,7 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutorCallback;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.RawProxyExecutor;
+import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecutor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.accessor.JDBCAccessor;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
@@ -77,7 +77,7 @@ public final class ProxySQLExecutor {
private final JDBCExecutor jdbcExecutor;
- private final RawProxyExecutor rawExecutor;
+ private final ProxyRawExecutor rawExecutor;
public ProxySQLExecutor(final BackendConnection backendConnection, final JDBCAccessor accessor) {
this.backendConnection = backendConnection;
@@ -85,7 +85,7 @@ public final class ProxySQLExecutor {
ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
boolean isSerialExecute = backendConnection.isSerialExecute();
jdbcExecutor = new JDBCExecutor(executorEngine, isSerialExecute);
- rawExecutor = new RawProxyExecutor(executorEngine, isSerialExecute);
+ rawExecutor = new ProxyRawExecutor(executorEngine, isSerialExecute);
}
/**
@@ -121,66 +121,59 @@ public final class ProxySQLExecutor {
public BackendResponse execute(final ExecutionContext executionContext) throws SQLException {
Collection<ExecuteResult> executeResults = execute(executionContext,
executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- ExecuteResult executeResult = executeResults.iterator().next();
- if (executeResult instanceof QueryResult) {
- ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
- int columnCount = ((QueryResult) executeResult).getMetaData().getColumnCount();
- List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
- if (hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
- queryHeaders.add(QueryHeaderBuilder.build(
- ((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), (QueryResult) executeResult, metaData, columnIndex));
- } else {
- queryHeaders.add(QueryHeaderBuilder.build((QueryResult) executeResult, metaData, columnIndex));
- }
- }
- return getExecuteQueryResponse(queryHeaders, executeResults);
- } else {
- UpdateResponse result = new UpdateResponse(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
- result.setType("INSERT");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
- result.setType("DELETE");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
}
private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName());
- return metaData.getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)
- ? executeWithRaw(executionContext, maxConnectionsSizePerQuery)
- : executeWithDriver(executionContext, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ return rules.stream().anyMatch(each -> each instanceof RawExecutionRule)
+ ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ }
+
+ private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
+ RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
+ Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+ // TODO handle query header
+ return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
}
- private Collection<ExecuteResult> executeWithDriver(final ExecutionContext executionContext,
- final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+ private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
+ final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
DatabaseType databaseType = ProxyContext.getInstance().getMetaDataContexts().getDatabaseType();
- return jdbcExecutor.execute(createExecutionGroups(executionContext.getExecutionUnits(), maxConnectionsSizePerQuery, isReturnGeneratedKeys, executionContext.getRouteContext()),
+ Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups(
+ executionContext.getExecutionUnits(), rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, executionContext.getRouteContext());
+ return jdbcExecutor.execute(executionGroups,
new ProxyJDBCExecutorCallback(databaseType, backendConnection, accessor, isExceptionThrown, isReturnGeneratedKeys, true),
new ProxyJDBCExecutorCallback(databaseType, backendConnection, accessor, isExceptionThrown, isReturnGeneratedKeys, false));
}
- private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final int maxConnectionsSizePerQuery,
- final boolean isReturnGeneratedKeys, final RouteContext routeContext) throws SQLException {
- Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final Collection<ShardingSphereRule> rules,
+ final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys,
+ final RouteContext routeContext) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = accessor.getExecutionPrepareEngine(
backendConnection, maxConnectionsSizePerQuery, new StatementOption(isReturnGeneratedKeys), rules);
return prepareEngine.prepare(routeContext, executionUnits);
}
- private Collection<ExecuteResult> executeWithRaw(final ExecutionContext executionContext, final int maxConnectionsSizePerQuery) throws SQLException {
- Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
- Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules).prepare(executionContext.getRouteContext(),
- executionContext.getExecutionUnits());
- // TODO handle query header
- return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
+ private BackendResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+ ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ if (hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
+ queryHeaders.add(QueryHeaderBuilder.build(
+ ((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex));
+ } else {
+ queryHeaders.add(QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex));
+ }
+ }
+ return getQueryResponses(queryHeaders, executeResults);
}
- private BackendResponse getExecuteQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
+ private BackendResponse getQueryResponses(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
QueryResponse result = new QueryResponse(queryHeaders);
for (ExecuteResult each : executeResults) {
result.getQueryResults().add((QueryResult) each);
@@ -191,4 +184,16 @@ public final class ProxySQLExecutor {
private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
}
+
+ private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponse result = new UpdateResponse(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
similarity index 84%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
index c1dcb98..b4e152e 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
+package org.apache.shardingsphere.proxy.backend.communication.raw;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
@@ -32,10 +32,10 @@ import java.util.Collections;
import java.util.List;
/**
- * Raw Proxy executor.
+ * Proxy raw executor.
*/
@RequiredArgsConstructor
-public final class RawProxyExecutor {
+public final class ProxyRawExecutor {
private final ExecutorEngine executorEngine;
@@ -46,12 +46,12 @@ public final class RawProxyExecutor {
*
* @param executionGroups execution groups
* @param callback raw SQL execute callback
- * @return return true if is DQL, false if is DML
+ * @return execute results
* @throws SQLException SQL exception
*/
public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
// TODO Load query header for first query
- List<ExecuteResult> results = doExecute(executionGroups, null, callback);
+ List<ExecuteResult> results = execute(executionGroups, null, callback);
// TODO refresh metadata
if (null == results || results.isEmpty() || null == results.get(0)) {
return Collections.singleton(new UpdateResult(0, 0L));
@@ -65,8 +65,8 @@ public final class RawProxyExecutor {
}
@SuppressWarnings("unchecked")
- private <T> List<T> doExecute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
- final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
+ private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups,
+ final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
try {
return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {