You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by xi...@apache.org on 2020/11/29 14:02:21 UTC

[shardingsphere] branch master updated: Refactor ProxyRawExecutor (#8410)

This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ff61094  Refactor ProxyRawExecutor (#8410)
ff61094 is described below

commit ff6109453c06fe8d9e01a5d150243936931bf540
Author: Liang Zhang <te...@163.com>
AuthorDate: Sun Nov 29 22:01:50 2020 +0800

    Refactor ProxyRawExecutor (#8410)
    
    * Add processExecuteQuery and processExecuteUpdate for ProxySQLExecutor
    
    * Refactor RawSQLExecutorCallback
    
    * Refactor ProxySQLExecutor
    
    * Move ProxyRawExecutor to raw package
---
 .../raw/callback/RawSQLExecutorCallback.java       | 14 ++--
 .../backend/communication/ProxySQLExecutor.java    | 95 ++++++++++++----------
 .../ProxyRawExecutor.java}                         | 14 ++--
 3 files changed, 64 insertions(+), 59 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 6979441..b916303 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback;
 
+import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
@@ -38,17 +38,17 @@ public final class RawSQLExecutorCallback implements ExecutorCallback<RawSQLExec
         ShardingSphereServiceLoader.register(RawExecutorCallback.class);
     }
     
-    private final Collection<RawExecutorCallback> rawExecutorCallbacks;
+    @SuppressWarnings("rawtypes")
+    private final Collection<RawExecutorCallback> callbacks;
     
     public RawSQLExecutorCallback() {
-        rawExecutorCallbacks = ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
-        if (null == rawExecutorCallbacks || rawExecutorCallbacks.isEmpty()) {
-            throw new ShardingSphereException("not found raw executor callback impl");
-        }
+        callbacks = ShardingSphereServiceLoader.newServiceInstances(RawExecutorCallback.class);
+        Preconditions.checkState(!callbacks.isEmpty(), "No raw executor callback implementation found.");
     }
     
+    @SuppressWarnings("unchecked")
     @Override
     public Collection<ExecuteResult> execute(final Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
-        return rawExecutorCallbacks.iterator().next().execute(inputs, isTrunkThread, dataMap);
+        return callbacks.iterator().next().execute(inputs, isTrunkThread, dataMap);
     }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 58f2721..90e0dfc 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -42,7 +42,7 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutorCallback;
-import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.RawProxyExecutor;
+import org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecutor;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.accessor.JDBCAccessor;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
@@ -77,7 +77,7 @@ public final class ProxySQLExecutor {
     
     private final JDBCExecutor jdbcExecutor;
     
-    private final RawProxyExecutor rawExecutor;
+    private final ProxyRawExecutor rawExecutor;
     
     public ProxySQLExecutor(final BackendConnection backendConnection, final JDBCAccessor accessor) {
         this.backendConnection = backendConnection;
@@ -85,7 +85,7 @@ public final class ProxySQLExecutor {
         ExecutorEngine executorEngine = BackendExecutorContext.getInstance().getExecutorEngine();
         boolean isSerialExecute = backendConnection.isSerialExecute();
         jdbcExecutor = new JDBCExecutor(executorEngine, isSerialExecute);
-        rawExecutor = new RawProxyExecutor(executorEngine, isSerialExecute);
+        rawExecutor = new ProxyRawExecutor(executorEngine, isSerialExecute);
     }
     
     /**
@@ -121,66 +121,59 @@ public final class ProxySQLExecutor {
     public BackendResponse execute(final ExecutionContext executionContext) throws SQLException {
         Collection<ExecuteResult> executeResults = execute(executionContext,
                 executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        ExecuteResult executeResult = executeResults.iterator().next();
-        if (executeResult instanceof QueryResult) {
-            ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
-            int columnCount = ((QueryResult) executeResult).getMetaData().getColumnCount();
-            List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
-            for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
-                if (hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
-                    queryHeaders.add(QueryHeaderBuilder.build(
-                            ((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), (QueryResult) executeResult, metaData, columnIndex));
-                } else {
-                    queryHeaders.add(QueryHeaderBuilder.build((QueryResult) executeResult, metaData, columnIndex));
-                }
-            }
-            return getExecuteQueryResponse(queryHeaders, executeResults);
-        } else {
-            UpdateResponse result = new UpdateResponse(executeResults);
-            if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
-                result.setType("INSERT");
-            } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
-                result.setType("DELETE");
-            } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
-                result.setType("UPDATE");
-            }
-            return result;
-        }
+        ExecuteResult executeResultSample = executeResults.iterator().next();
+        return executeResultSample instanceof QueryResult
+                ? processExecuteQuery(executionContext, executeResults, (QueryResult) executeResultSample) : processExecuteUpdate(executionContext, executeResults);
     }
     
     private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
         int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
-        ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName());
-        return metaData.getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule) 
-                ? executeWithRaw(executionContext, maxConnectionsSizePerQuery)
-                : executeWithDriver(executionContext, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+        Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+        return rules.stream().anyMatch(each -> each instanceof RawExecutionRule) 
+                ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+    }
+    
+    private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
+        RawExecutionPrepareEngine prepareEngine = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules);
+        Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups = prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits());
+        // TODO handle query header
+        return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
     }
     
-    private Collection<ExecuteResult> executeWithDriver(final ExecutionContext executionContext,
-                                                        final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+    private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, 
+                                                         final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
         DatabaseType databaseType = ProxyContext.getInstance().getMetaDataContexts().getDatabaseType();
-        return jdbcExecutor.execute(createExecutionGroups(executionContext.getExecutionUnits(), maxConnectionsSizePerQuery, isReturnGeneratedKeys, executionContext.getRouteContext()),
+        Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups(
+                executionContext.getExecutionUnits(), rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, executionContext.getRouteContext());
+        return jdbcExecutor.execute(executionGroups,
                 new ProxyJDBCExecutorCallback(databaseType, backendConnection, accessor, isExceptionThrown, isReturnGeneratedKeys, true),
                 new ProxyJDBCExecutorCallback(databaseType, backendConnection, accessor, isExceptionThrown, isReturnGeneratedKeys, false));
     }
     
-    private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final int maxConnectionsSizePerQuery,
-                                                                                final boolean isReturnGeneratedKeys, final RouteContext routeContext) throws SQLException {
-        Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+    private Collection<ExecutionGroup<JDBCExecutionUnit>> createExecutionGroups(final Collection<ExecutionUnit> executionUnits, final Collection<ShardingSphereRule> rules, 
+                                                                                final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, 
+                                                                                final RouteContext routeContext) throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = accessor.getExecutionPrepareEngine(
                 backendConnection, maxConnectionsSizePerQuery, new StatementOption(isReturnGeneratedKeys), rules);
         return prepareEngine.prepare(routeContext, executionUnits);
     }
     
-    private Collection<ExecuteResult> executeWithRaw(final ExecutionContext executionContext, final int maxConnectionsSizePerQuery) throws SQLException {
-        Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
-        Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups = new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, rules).prepare(executionContext.getRouteContext(),
-                executionContext.getExecutionUnits());
-        // TODO handle query header
-        return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
+    private BackendResponse processExecuteQuery(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults, final QueryResult executeResultSample) throws SQLException {
+        ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
+        int columnCount = executeResultSample.getMetaData().getColumnCount();
+        List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
+        for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+            if (hasSelectExpandProjections(executionContext.getSqlStatementContext())) {
+                queryHeaders.add(QueryHeaderBuilder.build(
+                        ((SelectStatementContext) executionContext.getSqlStatementContext()).getProjectionsContext(), executeResultSample, metaData, columnIndex));
+            } else {
+                queryHeaders.add(QueryHeaderBuilder.build(executeResultSample, metaData, columnIndex));
+            }
+        }
+        return getQueryResponses(queryHeaders, executeResults);
     }
     
-    private BackendResponse getExecuteQueryResponse(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
+    private BackendResponse getQueryResponses(final List<QueryHeader> queryHeaders, final Collection<ExecuteResult> executeResults) {
         QueryResponse result = new QueryResponse(queryHeaders);
         for (ExecuteResult each : executeResults) {
             result.getQueryResults().add((QueryResult) each);
@@ -191,4 +184,16 @@ public final class ProxySQLExecutor {
     private boolean hasSelectExpandProjections(final SQLStatementContext<?> sqlStatementContext) {
         return sqlStatementContext instanceof SelectStatementContext && !((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
     }
+    
+    private UpdateResponse processExecuteUpdate(final ExecutionContext executionContext, final Collection<ExecuteResult> executeResults) {
+        UpdateResponse result = new UpdateResponse(executeResults);
+        if (executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement) {
+            result.setType("INSERT");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof DeleteStatement) {
+            result.setType("DELETE");
+        } else if (executionContext.getSqlStatementContext().getSqlStatement() instanceof UpdateStatement) {
+            result.setType("UPDATE");
+        }
+        return result;
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
similarity index 84%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
index c1dcb98..b4e152e 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/RawProxyExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/raw/ProxyRawExecutor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
+package org.apache.shardingsphere.proxy.backend.communication.raw;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
@@ -32,10 +32,10 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Raw Proxy executor.
+ * Proxy raw executor.
  */
 @RequiredArgsConstructor
-public final class RawProxyExecutor {
+public final class ProxyRawExecutor {
     
     private final ExecutorEngine executorEngine;
     
@@ -46,12 +46,12 @@ public final class RawProxyExecutor {
      *
      * @param executionGroups execution groups
      * @param callback raw SQL execute callback
-     * @return return true if is DQL, false if is DML
+     * @return execute results
      * @throws SQLException SQL exception
      */
     public Collection<ExecuteResult> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, final RawSQLExecutorCallback callback) throws SQLException {
         // TODO Load query header for first query
-        List<ExecuteResult> results = doExecute(executionGroups, null, callback);
+        List<ExecuteResult> results = execute(executionGroups, null, callback);
         // TODO refresh metadata
         if (null == results || results.isEmpty() || null == results.get(0)) {
             return Collections.singleton(new UpdateResult(0, 0L));
@@ -65,8 +65,8 @@ public final class RawProxyExecutor {
     }
     
     @SuppressWarnings("unchecked")
-    private <T> List<T> doExecute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, 
-                                  final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
+    private <T> List<T> execute(final Collection<ExecutionGroup<RawSQLExecutionUnit>> executionGroups, 
+                                final RawSQLExecutorCallback firstCallback, final RawSQLExecutorCallback callback) throws SQLException {
         try {
             return executorEngine.execute((Collection) executionGroups, firstCallback, callback, serial);
         } catch (final SQLException ex) {