You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/01/03 23:35:38 UTC

[shardingsphere] branch master updated: Use CalciteInternalExecutor in PROXY (#8869)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 50784a2  Use CalciteInternalExecutor in PROXY (#8869)
50784a2 is described below

commit 50784a276555c593218525c0aa5eed923a3524e1
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Mon Jan 4 07:35:04 2021 +0800

    Use CalciteInternalExecutor in PROXY (#8869)
    
    * Use CalciteInternalExecutor in PROXY
    
    * check style
    
    * fix ut
---
 .../optimize/execute/CalciteInternalExecutor.java  |  6 ++--
 .../backend/communication/ProxySQLExecutor.java    | 41 +++++++++++++++++++---
 .../jdbc/connection/BackendConnection.java         | 21 +++++++++++
 .../jdbc/executor/ProxyJDBCExecutor.java           |  2 ++
 .../frontend/command/CommandExecutorTask.java      |  1 +
 .../netty/FrontendChannelInboundHandler.java       |  1 +
 .../frontend/command/CommandExecutorTaskTest.java  |  4 +++
 7 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
index af3d53e..7976f6c 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.stream.Collectors;
 
 /**
  * Calcite internal executor.
@@ -52,7 +54,7 @@ public final class CalciteInternalExecutor {
     
     private final ExecutionContext executionContext;
     
-    private final JDBCExecutorCallback<QueryResult> callback;
+    private final JDBCExecutorCallback<? extends ExecuteResult> callback;
     
     /**
      * Execute.
@@ -62,7 +64,7 @@ public final class CalciteInternalExecutor {
     public Collection<QueryResult> execute() {
         try {
             Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups(executionContext);
-            return jdbcExecutor.execute(executionGroups, callback);
+            return jdbcExecutor.execute(executionGroups, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
         } catch (final SQLException ex) {
             throw new ShardingSphereException(ex);
         }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 59b91fb..f61856d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -18,24 +18,31 @@
 package org.apache.shardingsphere.proxy.backend.communication;
 
 import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -47,6 +54,8 @@ import org.apache.shardingsphere.transaction.core.TransactionType;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
 
 /**
  * Proxy SQL Executor.
@@ -97,9 +106,18 @@ public final class ProxySQLExecutor {
         Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
         int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement;
-        return rules.stream().anyMatch(each -> each instanceof RawExecutionRule) 
-                ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
-                : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+        return execute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys);
+    }
+    
+    private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, 
+                                              final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys) throws SQLException {
+        if (rules.stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+            return rawExecute(executionContext, rules, maxConnectionsSizePerQuery);
+        }
+        if (executionContext.getRouteContext().isToCalcite()) {
+            return useCalciteToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+        }
+        return useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
     }
     
     private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
@@ -109,6 +127,21 @@ public final class ProxySQLExecutor {
         return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
     }
     
+    private Collection<ExecuteResult> useCalciteToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
+                                                          final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+        if (executionContext.getExecutionUnits().isEmpty()) {
+            return Collections.emptyList();
+        }
+        MetaDataContexts metaData = ProxyContext.getInstance().getMetaDataContexts();
+        ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(), 
+                executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isExceptionThrown, isReturnGeneratedKeys, true);
+        CalciteInternalExecutor executor = new CalciteInternalExecutor(rules, maxConnectionsSizePerQuery, backendConnection, jdbcExecutor.getJdbcExecutor(), executionContext, callback);
+        CalciteExecutor calciteExecutor = new CalciteJDBCExecutor(metaData.getCalciteContextFactory().create(backendConnection.getSchemaName(), executor));
+        backendConnection.setCalciteExecutor(calciteExecutor);
+        SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+        return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters()).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+    }
+    
     private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, 
                                                          final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index afa6ec3..ff7a493 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
 import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
@@ -67,6 +68,9 @@ public final class BackendConnection implements ExecutorJDBCManager {
     @Setter
     private String username;
     
+    @Setter
+    private CalciteExecutor calciteExecutor;
+    
     private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
     
     private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
@@ -284,4 +288,21 @@ public final class BackendConnection implements ExecutorJDBCManager {
         connectionStatus.switchToReleased();
         return result;
     }
+    
+    /**
+     * Close calcite executor.
+     * 
+     * @return SQL exception when calcite executor close
+     */
+    public synchronized Collection<SQLException> closeCalciteExecutor() {
+        Collection<SQLException> result = new LinkedList<>();
+        if (null != calciteExecutor) {
+            try {
+                calciteExecutor.close();
+            } catch (final SQLException ex) {
+                result.add(ex);
+            }
+        }
+        return result;
+    }
 }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4e8526a..9adcaa8 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
 
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -41,6 +42,7 @@ public final class ProxyJDBCExecutor {
     
     private final BackendConnection backendConnection;
     
+    @Getter
     private final JDBCExecutor jdbcExecutor;
     
     /**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 5a47d28..74a26e3 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -123,6 +123,7 @@ public final class CommandExecutorTask implements Runnable {
         PrimaryVisitedManager.clear();
         result.addAll(backendConnection.closeResultSets());
         result.addAll(backendConnection.closeStatements());
+        result.addAll(backendConnection.closeCalciteExecutor());
         return result;
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index f934934..fdd3c0d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,6 +94,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
         backendConnection.closeResultSets();
         backendConnection.closeStatements();
         backendConnection.closeConnections(true);
+        backendConnection.closeCalciteExecutor();
         ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
     }
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 81f2611..b405292 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -104,6 +104,7 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
         when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+        when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -124,6 +125,7 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
         when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+        when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -149,6 +151,7 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
         when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+        when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(connectionStatus).waitUntilConnectionRelease();
@@ -169,6 +172,7 @@ public final class CommandExecutorTaskTest {
         when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
         when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
         when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+        when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
         CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
         actual.run();
         verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);