You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/30 13:23:34 UTC

[shardingsphere] branch master updated: Decouple QueryResponse and QueryResult (#8425)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb0aed  Decouple QueryResponse and QueryResult (#8425)
1bb0aed is described below

commit 1bb0aedcfce6e6a7c66622739ca4d07836508df0
Author: Liang Zhang <te...@163.com>
AuthorDate: Mon Nov 30 21:23:06 2020 +0800

    Decouple QueryResponse and QueryResult (#8425)
    
    * Refactor ShowTablesBackendHandler
    
    * Refactor JDBCDatabaseCommunicationEngine
    
    * Move QueryHeader creation to JDBCDatabaseCommunicationEngine
    
    * Refactor JDBCDatabaseCommunicationEngine
    
    * Refactor JDBCDatabaseCommunicationEngine
    
    * Remove useless QueryResponse.queryResults
    
    * Remove useless QueryResponse.queryResults
---
 .../backend/communication/ProxySQLExecutor.java    | 72 ++-----------------
 .../jdbc/JDBCDatabaseCommunicationEngine.java      | 80 +++++++++++++++++-----
 .../backend/response/query/QueryResponse.java      |  4 --
 .../text/admin/ShowTablesBackendHandler.java       |  1 -
 4 files changed, 67 insertions(+), 90 deletions(-)

diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 816ec91..29e00b7 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -17,8 +17,6 @@
 
 package org.apache.shardingsphere.proxy.backend.communication;
 
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -29,11 +27,9 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -43,23 +39,14 @@ import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecuto
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
-import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
-import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * Proxy SQL Executor.
@@ -103,22 +90,16 @@ public final class ProxySQLExecutor {
      * Execute SQL.
      *
      * @param executionContext execution context
-     * @return execute response
+     * @return execute results
      * @throws SQLException SQL exception
      */
-    public BackendResponse execute(final ExecutionContext executionContext) throws SQLException {
-        Collection<ExecuteResult> executeResults = execute(executionContext,
-                executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        ExecuteResult executeResultSample = executeResults.iterator().next();
-        return executeResultSample instanceof QueryResult
-                ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
-    }
-    
-    private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
-        int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+    public Collection<ExecuteResult> execute(final ExecutionContext executionContext) throws SQLException {
         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+        int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+        boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement;
         return rules.stream().anyMatch(each -> each instanceof RawExecutionRule) 
-                ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+                ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
+                : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
     private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
@@ -135,45 +116,4 @@ public final class ProxySQLExecutor {
         Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
         return jdbcExecutor.execute(executionGroups, isExceptionThrown, isReturnGeneratedKeys);
     }
-    
-    private BackendResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
-        ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
-        int columnCount = executeResultSample.getMetaData().getColumnCount();
-        List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
-        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
-            queryHeaders.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
-        }
-        return createQueryResponse(queryHeaders, executeResults);
-    }
-    
-    private QueryHeader createQueryHeader(final ExecutionContext executionContext, 
-                                          final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
-        return hasSelectExpandProjections(executionContext.getSqlStatementContext())
-                ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
-                : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
-    }
-    
-    private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
-        return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
-    }
-    
-    private BackendResponse createQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
-        QueryResponse result = new QueryResponse(queryHeaders);
-        for (ExecuteResult each : executeResults) {
-            result.getQueryResults().add((QueryResult) each);
-        }
-        return result;
-    }
-    
-    private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
-        UpdateResponse result = new UpdateResponse(executeResults);
-        if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
-            result.setType("INSERT");
-        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
-            result.setType("DELETE");
-        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
-            result.setType("UPDATE");
-        }
-        return result;
-    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 751c2d9..038de73 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -20,9 +20,11 @@ package org.apache.shardingsphere.proxy.backend.communication.jdbc;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
 import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
@@ -41,9 +43,13 @@ import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
 import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
 import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
+import org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
 import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
 import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -67,7 +73,7 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
     
     private final KernelProcessor kernelProcessor = new KernelProcessor();
     
-    private BackendResponse response;
+    private List<QueryHeader> queryHeaders;
     
     private MergedResult mergedResult;
     
@@ -89,10 +95,55 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
             return new UpdateResponse();
         }
         proxySQLExecutor.checkExecutePrerequisites(executionContext);
-        response = proxySQLExecutor.execute(executionContext);
+        Collection<ExecuteResult> executeResults = proxySQLExecutor.execute(executionContext);
+        ExecuteResult executeResultSample = executeResults.iterator().next();
+        return executeResultSample instanceof QueryResult
+                ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
+    }
+    
+    private QueryResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+        queryHeaders = createQueryHeaders(executionContext, executeResultSample);
+        mergedResult = mergeQuery(executionContext.getSqlStatementContext(), executeResults.stream().map(each -> (QueryResult) each).collect(Collectors.toList()));
+        return new QueryResponse(queryHeaders);
+    }
+    
+    private List<QueryHeader> createQueryHeaders(final ExecutionContext executionContext, final QueryResult executeResultSample) throws SQLException {
+        int columnCount = executeResultSample.getMetaData().getColumnCount();
+        List<QueryHeader> result = new ArrayList<>(columnCount);
+        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+            result.add(createQueryHeader(executionContext, executeResultSample, metaData, columnIndex));
+        }
+        return result;
+    }
+    
+    private QueryHeader createQueryHeader(final ExecutionContext executionContext,
+                                          final QueryResult executeResultSample, final ShardingSphereMetaData metaData, final int columnIndex) throws SQLException {
+        return hasSelectExpandProjections(executionContext.getSqlStatementContext())
+                ? QueryHeaderBuilder.build(((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex)
+                : QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex);
+    }
+    
+    private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
+        return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+    }
+    
+    private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) throws SQLException {
+        UpdateResponse result = createUpdateResponse(executionContext, executeResults);
         refreshSchema(executionContext);
-        merge(executionContext.getSqlStatementContext());
-        return response;
+        mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+        return result;
+    }
+    
+    private UpdateResponse createUpdateResponse(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+        UpdateResponse result = new UpdateResponse(executeResults);
+        if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+            result.setType("INSERT");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+            result.setType("DELETE");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+            result.setType("UPDATE");
+        }
+        return result;
     }
     
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -115,17 +166,15 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
         OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
     }
     
-    private void merge(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
-        if (response instanceof UpdateResponse) {
-            mergeUpdateCount(sqlStatementContext);
-            return;
-        }
-        mergedResult = mergeQuery(sqlStatementContext, ((QueryResponse) response).getQueryResults());
+    private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+        MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+                metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
+        return mergeEngine.merge(queryResults, sqlStatementContext);
     }
     
-    private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext) {
+    private void mergeUpdateCount(final SQLStatementContext<?> sqlStatementContext, final UpdateResponse response) {
         if (isNeedAccumulate(sqlStatementContext)) {
-            ((UpdateResponse) response).mergeUpdateCount();
+            response.mergeUpdateCount();
         }
     }
     
@@ -135,12 +184,6 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
         return dataNodeContainedRule.isPresent() && dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
     }
     
-    private MergedResult mergeQuery(final SQLStatementContext<?> sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
-        MergeEngine mergeEngine = new MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(), 
-                metaData.getSchema(), ProxyContext.getInstance().getMetaDataContexts().getProps(), metaData.getRuleMetaData().getRules());
-        return mergeEngine.merge(queryResults, sqlStatementContext);
-    }
-    
     @Override
     public boolean next() throws SQLException {
         return null != mergedResult && mergedResult.next();
@@ -148,7 +191,6 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
     
     @Override
     public QueryData getQueryData() throws SQLException {
-        List<QueryHeader> queryHeaders = ((QueryResponse) response).getQueryHeaders();
         List<Object> row = new ArrayList<>(queryHeaders.size());
         for (int columnIndex = 1; columnIndex <= queryHeaders.size(); columnIndex++) {
             row.add(mergedResult.getValue(columnIndex, Object.class));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
index 5d3eb1f..63acc61 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
@@ -19,10 +19,8 @@ package org.apache.shardingsphere.proxy.backend.response.query;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
 
-import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -33,6 +31,4 @@ import java.util.List;
 public final class QueryResponse implements BackendResponse {
     
     private final List<QueryHeader> queryHeaders;
-    
-    private final List<QueryResult> queryResults = new LinkedList<>();
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
index 1aa0001..0bbc38a 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
@@ -59,7 +59,6 @@ public final class ShowTablesBackendHandler implements TextProtocolBackendHandle
         Collection<String> allTableNames = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getSchema().getAllTableNames();
         List<MemoryQueryResultDataRow> rows = allTableNames.stream().map(each -> new MemoryQueryResultDataRow(Collections.singletonList(each))).collect(Collectors.toList());
         queryResult = new RawMemoryQueryResult(metaData, rows);
-        result.getQueryResults().add(queryResult);
         return result;
     }