You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/30 13:23:34 UTC
[shardingsphere] branch master updated: Decouple QueryResponse and
QueryResult (#8425)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb0aed Decouple QueryResponse and QueryResult (#8425)
1bb0aed is described below
commit 1bb0aedcfce6e6a7c66622739ca4d07836508df0
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Nov 30 21:23:06 2020 +0800
Decouple QueryResponse and QueryResult (#8425)
* Refactor ShowTablesBackendHandler
* Refactor JDBCDatabaseCommunicationEngine
* Move QueryHeader creation to JDBCDatabaseCommunicationEngine
* Refactor JDBCDatabaseCommunicationEngine
* Refactor JDBCDatabaseCommunicationEngine
* Remove useless QueryResponse.queryResults
* Remove useless QueryResponse.queryResults
---
.../backend/communication/ProxySQLExecutor.java | 72 ++-----------------
.../jdbc/JDBCDatabaseCommunicationEngine.java | 80 +++++++++++++++++-----
.../backend/response/query/QueryResponse.java | 4 --
.../text/admin/ShowTablesBackendHandler.java | 1 -
4 files changed, 67 insertions(+), 90 deletions(-)
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 816ec91..29e00b7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.proxy.backend.communication;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -29,11 +27,9 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -43,23 +39,14 @@ import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecuto
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
-import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
-import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
/**
* Proxy SQL Executor.
@@ -103,22 +90,16 @@ public final class ProxySQLExecutor {
* Execute SQL.
*
* @param executionContext execution context
- * @return execute response
+ * @return execute results
* @throws SQLException SQL exception
*/
- public BackendResponse execute(final ExecutionContext executionContext) throws SQLException {
- Collection<ExecuteResult> executeResults = execute(executionContext,
- executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- ExecuteResult executeResultSample = executeResults.iterator().next();
- return executeResultSample instanceof QueryResult
- ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
- }
-
- private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
- int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ public Collection<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement;
return rules.stream().anyMatch(each -> each instanceof RawExecutionRule)
- ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
+ : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
}
private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
@@ -135,45 +116,4 @@ public final class ProxySQLExecutor {
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
return jdbcExecutor.execute(executionGroups, isExceptionThrown, isReturnGeneratedKeys);
}
-
- private BackendResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
- ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
- int columnCount = executeResultSample.getMetaData().getColumnCount();
- List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
- queryHeaders.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
- }
- return createQueryResponse(queryHeaders, executeResults);
- }
-
- private QueryHeader createQueryHeader(final ExecutionContext executionContext,
- final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
- return hasSelectExpandProjections(executionContext.getSqlStatementContext())
- ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
- : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
- }
-
- private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
- return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
- }
-
- private BackendResponse createQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
- QueryResponse result = new QueryResponse(queryHeaders);
- for (ExecuteResult each : executeResults) {
- result.getQueryResults().add((QueryResult) each);
- }
- return result;
- }
-
- private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
- UpdateResponse result = new UpdateResponse(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
- result.setType("INSERT");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
- result.setType("DELETE");
- } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 751c2d9..038de73 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.proxy.backend.communication.jdbc;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
import org.apache.shardingsphere.infra.merge.MergeEngine;
@@ -41,9 +43,13 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -67,7 +73,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
private final KernelProcessor kernelProcessor = new KernelProcessor();
- private BackendResponse response;
+ private List<QueryHeader> queryHeaders;
private MergedResult mergedResult;
@@ -89,10 +95,55 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
return new UpdateResponse();
}
proxySQLExecutor.checkExecutePrerequisites(executionContext);
- response = proxySQLExecutor.execute(executionContext);
+ Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
+ }
+
+ private QueryResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+ queryHeaders = createQueryHeaders(executionContext, executeResultSample);
+ mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
+ return new QueryResponse(queryHeaders);
+ }
+
+ private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> result = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
+ }
+ return result;
+ }
+
+ private QueryHeader createQueryHeader(final ExecutionContext executionContext,
+ final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
+ return hasSelectExpandProjections(executionContext.getSqlStatementContext())
+ ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
+ : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
+ }
+
+ private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
+ return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+ }
+
+ private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
+ UpdateResponse result = createUpdateResponse(executionContext, executeResults);
refreshSchema(executionContext);
- merge(executionContext.getSqlStatementContext());
- return response;
+ mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+ return result;
+ }
+
+ private UpdateResponse createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponse result = new UpdateResponse(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -115,17 +166,15 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
}
- private void merge(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
- if (response instanceof UpdateResponse) {
- mergeUpdateCount(sqlStatementContext);
- return;
- }
- mergedResult = mergeQuery(sqlStatementContext, ((QueryResponse) response).getQueryResults());
+ private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+ MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
}
- private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext) {
+ private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponse response) {
if (isNeedAccumulate(sqlStatementContext)) {
- ((UpdateResponse) response).mergeUpdateCount();
+ response.mergeUpdateCount();
}
}
@@ -135,12 +184,6 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
}
- private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
@Override
public boolean next() throws SQLException {
return null != mergedResult && mergedResult.next();
@@ -148,7 +191,6 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
@Override
public QueryData getQueryData() throws SQLException {
- List<QueryHeader> queryHeaders = ((QueryResponse) response).getQueryHeaders();
List<Object> row = new ArrayList<>(queryHeaders.size());
for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
row.add(mergedResult.getValue(columnIndex, Object.class));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
index 5d3eb1f..63acc61 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
@@ -19,10 +19,8 @@ package org.apache.shardingsphere.proxy.backend.response.query;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
-import java.util.LinkedList;
import java.util.List;
/**
@@ -33,6 +31,4 @@ import java.util.List;
public final class QueryResponse implements BackendResponse {
private final List<QueryHeader> queryHeaders;
-
- private final List<QueryResult> queryResults = new LinkedList<>();
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
index 1aa0001..0bbc38a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
@@ -59,7 +59,6 @@ public final class ShowTablesBackendHandler implements TextProtocolBackendHandle
Collection<String> allTableNames = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getSchema().getAllTableNames();
List<MemoryQueryResultDataRow> rows = allTableNames.stream().map(each -> new MemoryQueryResultDataRow(Collections.singletonList(each))).collect(Collectors.toList());
queryResult = new RawMemoryQueryResult(metaData, rows);
- result.getQueryResults().add(queryResult);
return result;
}