You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2021/01/03 23:35:38 UTC
[shardingsphere] branch master updated: Use CalciteInternalExecutor
in PROXY (#8869)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 50784a2 Use CalciteInternalExecutor in PROXY (#8869)
50784a2 is described below
commit 50784a276555c593218525c0aa5eed923a3524e1
Author: Juan Pan(Trista) <pa...@apache.org>
AuthorDate: Mon Jan 4 07:35:04 2021 +0800
Use CalciteInternalExecutor in PROXY (#8869)
* Use CalciteInternalExecutor in PROXY
* check style
* fix ut
---
.../optimize/execute/CalciteInternalExecutor.java | 6 ++--
.../backend/communication/ProxySQLExecutor.java | 41 +++++++++++++++++++---
.../jdbc/connection/BackendConnection.java | 21 +++++++++++
.../jdbc/executor/ProxyJDBCExecutor.java | 2 ++
.../frontend/command/CommandExecutorTask.java | 1 +
.../netty/FrontendChannelInboundHandler.java | 1 +
.../frontend/command/CommandExecutorTaskTest.java | 4 +++
7 files changed, 70 insertions(+), 6 deletions(-)
diff --git a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
index af3d53e..7976f6c 100644
--- a/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
+++ b/shardingsphere-infra/shardingsphere-infra-optimize/src/main/java/org/apache/shardingsphere/infra/optimize/execute/CalciteInternalExecutor.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Calcite internal executor.
@@ -52,7 +54,7 @@ public final class CalciteInternalExecutor {
private final ExecutionContext executionContext;
- private final JDBCExecutorCallback<QueryResult> callback;
+ private final JDBCExecutorCallback<? extends ExecuteResult> callback;
/**
* Execute.
@@ -62,7 +64,7 @@ public final class CalciteInternalExecutor {
public Collection<QueryResult> execute() {
try {
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups = createExecutionGroups(executionContext);
- return jdbcExecutor.execute(executionGroups, callback);
+ return jdbcExecutor.execute(executionGroups, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList());
} catch (final SQLException ex) {
throw new ShardingSphereException(ex);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 59b91fb..f61856d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -18,24 +18,31 @@
package org.apache.shardingsphere.proxy.backend.communication;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
+import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteInternalExecutor;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteJDBCExecutor;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.ProxyJDBCExecutor;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallback;
+import org.apache.shardingsphere.proxy.backend.communication.jdbc.executor.callback.ProxyJDBCExecutorCallbackFactory;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction.TransactionStatus;
-import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
@@ -47,6 +54,8 @@ import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
/**
* Proxy SQL Executor.
@@ -97,9 +106,18 @@ public final class ProxySQLExecutor {
Collection<ShardingSphereRule> rules = ProxyContext.getInstance().getMetaDataContexts().getMetaData(backendConnection.getSchemaName()).getRuleMetaData().getRules();
int maxConnectionsSizePerQuery = ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
boolean isReturnGeneratedKeys = executionContext.getSqlStatementContext().getSqlStatement() instanceof InsertStatement;
- return rules.stream().anyMatch(each -> each instanceof RawExecutionRule)
- ? rawExecute(executionContext, rules, maxConnectionsSizePerQuery)
- : useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+ return execute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys);
+ }
+
+ private Collection<ExecuteResult> execute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
+ final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys) throws SQLException {
+ if (rules.stream().anyMatch(each -> each instanceof RawExecutionRule)) {
+ return rawExecute(executionContext, rules, maxConnectionsSizePerQuery);
+ }
+ if (executionContext.getRouteContext().isToCalcite()) {
+ return useCalciteToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
+ }
+ return useDriverToExecute(executionContext, rules, maxConnectionsSizePerQuery, isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown());
}
private Collection<ExecuteResult> rawExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules, final int maxConnectionsSizePerQuery) throws SQLException {
@@ -109,6 +127,21 @@ public final class ProxySQLExecutor {
return rawExecutor.execute(executionGroups, new RawSQLExecutorCallback());
}
+ private Collection<ExecuteResult> useCalciteToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
+ final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
+ if (executionContext.getExecutionUnits().isEmpty()) {
+ return Collections.emptyList();
+ }
+ MetaDataContexts metaData = ProxyContext.getInstance().getMetaDataContexts();
+ ProxyJDBCExecutorCallback callback = ProxyJDBCExecutorCallbackFactory.newInstance(type, metaData.getMetaData(backendConnection.getSchemaName()).getResource().getDatabaseType(),
+ executionContext.getSqlStatementContext().getSqlStatement(), backendConnection, isExceptionThrown, isReturnGeneratedKeys, true);
+ CalciteInternalExecutor executor = new CalciteInternalExecutor(rules, maxConnectionsSizePerQuery, backendConnection, jdbcExecutor.getJdbcExecutor(), executionContext, callback);
+ CalciteExecutor calciteExecutor = new CalciteJDBCExecutor(metaData.getCalciteContextFactory().create(backendConnection.getSchemaName(), executor));
+ backendConnection.setCalciteExecutor(calciteExecutor);
+ SQLUnit sqlUnit = executionContext.getExecutionUnits().iterator().next().getSqlUnit();
+ return calciteExecutor.executeQuery(sqlUnit.getSql(), sqlUnit.getParameters()).stream().map(each -> (ExecuteResult) each).collect(Collectors.toList());
+ }
+
private Collection<ExecuteResult> useDriverToExecute(final ExecutionContext executionContext, final Collection<ShardingSphereRule> rules,
final int maxConnectionsSizePerQuery, final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = new DriverExecutionPrepareEngine<>(
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
index afa6ec3..ff7a493 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/BackendConnection.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.infra.exception.ShardingSphereException;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.optimize.execute.CalciteExecutor;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.statement.StatementMemoryStrictlyFetchSizeSetter;
@@ -67,6 +68,9 @@ public final class BackendConnection implements ExecutorJDBCManager {
@Setter
private String username;
+ @Setter
+ private CalciteExecutor calciteExecutor;
+
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
private final Collection<Statement> cachedStatements = new CopyOnWriteArrayList<>();
@@ -284,4 +288,21 @@ public final class BackendConnection implements ExecutorJDBCManager {
connectionStatus.switchToReleased();
return result;
}
+
+ /**
+ * Close calcite executor.
+ *
+ * @return SQL exception when calcite executor close
+ */
+ public synchronized Collection<SQLException> closeCalciteExecutor() {
+ Collection<SQLException> result = new LinkedList<>();
+ if (null != calciteExecutor) {
+ try {
+ calciteExecutor.close();
+ } catch (final SQLException ex) {
+ result.add(ex);
+ }
+ }
+ return result;
+ }
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
index 4e8526a..9adcaa8 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/executor/ProxyJDBCExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc.executor;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -41,6 +42,7 @@ public final class ProxyJDBCExecutor {
private final BackendConnection backendConnection;
+ @Getter
private final JDBCExecutor jdbcExecutor;
/**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
index 5a47d28..74a26e3 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java
@@ -123,6 +123,7 @@ public final class CommandExecutorTask implements Runnable {
PrimaryVisitedManager.clear();
result.addAll(backendConnection.closeResultSets());
result.addAll(backendConnection.closeStatements());
+ result.addAll(backendConnection.closeCalciteExecutor());
return result;
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
index f934934..fdd3c0d 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java
@@ -94,6 +94,7 @@ public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAd
backendConnection.closeResultSets();
backendConnection.closeStatements();
backendConnection.closeConnections(true);
+ backendConnection.closeCalciteExecutor();
ChannelThreadExecutorGroup.getInstance().unregister(context.channel().id());
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
index 81f2611..b405292 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java
@@ -104,6 +104,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+ when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -124,6 +125,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+ when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -149,6 +151,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+ when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(connectionStatus).waitUntilConnectionRelease();
@@ -169,6 +172,7 @@ public final class CommandExecutorTaskTest {
when(backendConnection.closeResultSets()).thenReturn(Collections.emptyList());
when(backendConnection.closeStatements()).thenReturn(Collections.emptyList());
when(backendConnection.closeConnections(false)).thenReturn(Collections.emptyList());
+ when(backendConnection.closeCalciteExecutor()).thenReturn(Collections.emptyList());
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);