You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/07 03:59:18 UTC
[shardingsphere] branch master updated: support transaction manager for traffic (#14578)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c676417 support transaction manager for traffic (#14578)
c676417 is described below
commit c6764176e008bf8fe153bb2ad176db9b8d96a25c
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Fri Jan 7 11:58:24 2022 +0800
support transaction manager for traffic (#14578)
---
.../shardingsphere-jdbc-core/pom.xml | 1 +
.../driver/executor/DriverExecutor.java | 3 +-
.../jdbc/core/connection/ConnectionManager.java | 68 +++++++-
.../statement/ShardingSpherePreparedStatement.java | 31 ++--
.../core/statement/ShardingSphereStatement.java | 54 +++---
.../core/connection/ConnectionManagerTest.java | 94 ++++++++++-
.../shardingsphere-traffic-core/pom.xml | 5 -
.../traffic/context/TrafficContext.java | 17 +-
.../traffic/engine/TrafficEngine.java | 9 +-
.../traffic/executor/TrafficExecutor.java | 79 +++++----
.../traffic/executor/TrafficExecutorFactory.java | 41 -----
.../executor/context/TrafficExecutorContext.java | 33 ----
.../PreparedStatementExecutorContextBuilder.java | 42 -----
.../builder/StatementExecutorContextBuilder.java | 42 -----
.../builder/TrafficExecutorContextBuilder.java | 42 -----
.../traffic/executor/jdbc/JDBCTrafficExecutor.java | 185 ---------------------
...r.context.builder.TrafficExecutorContextBuilder | 19 ---
17 files changed, 257 insertions(+), 508 deletions(-)
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml b/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
index 442c494..dbfdde1 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
@@ -143,6 +143,7 @@
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index b3015a8..779c46c 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
import org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorFactory;
import java.sql.SQLException;
@@ -49,7 +48,7 @@ public final class DriverExecutor implements AutoCloseable {
regularExecutor = new DriverJDBCExecutor(connection.getSchema(), metaDataContexts, jdbcExecutor);
rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
federationExecutor = FederationExecutorFactory.newInstance(connection.getSchema(), metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), jdbcExecutor);
- trafficExecutor = TrafficExecutorFactory.newInstance(connection.getSchema(), metaDataContexts);
+ trafficExecutor = new TrafficExecutor();
}
/**
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 80ff54e..b6f0e94 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -20,13 +20,25 @@ package org.apache.shardingsphere.driver.jdbc.core.connection;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.zaxxer.hikari.HikariDataSource;
import lombok.Getter;
import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
import org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocationRecorder;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
+import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceId;
+import org.apache.shardingsphere.infra.instance.InstanceType;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.ConnectionTransaction;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
@@ -41,6 +53,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -51,7 +64,9 @@ import java.util.Random;
*/
public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseable {
- private final Map<String, DataSource> dataSourceMap;
+ private final Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();
+
+ private final Map<String, DataSource> physicalDataSourceMap = new LinkedHashMap<>();
@Getter
private final ConnectionTransaction connectionTransaction;
@@ -65,10 +80,56 @@ public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseab
private final Random random = new SecureRandom();
public ConnectionManager(final String schema, final ContextManager contextManager) {
- dataSourceMap = contextManager.getDataSourceMap(schema);
+ dataSourceMap.putAll(contextManager.getDataSourceMap(schema));
+ dataSourceMap.putAll(getTrafficDataSourceMap(schema, contextManager));
+ physicalDataSourceMap.putAll(contextManager.getDataSourceMap(schema));
connectionTransaction = createConnectionTransaction(schema, contextManager);
}
+ private Map<String, DataSource> getTrafficDataSourceMap(final String schema, final ContextManager contextManager) {
+ Optional<TrafficRule> trafficRule = contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
+ Optional<MetaDataPersistService> metaDataPersistService = contextManager.getMetaDataContexts().getMetaDataPersistService();
+ if (!trafficRule.isPresent() || !metaDataPersistService.isPresent()) {
+ return Collections.emptyMap();
+ }
+ Map<String, DataSourceConfiguration> dataSourceConfigs = metaDataPersistService.get().getDataSourceService().load(schema);
+ Preconditions.checkState(!dataSourceConfigs.isEmpty(), "Can not get dataSource configurations from meta data.");
+ DataSourceConfiguration dataSourceConfigSample = dataSourceConfigs.values().iterator().next();
+ Collection<ComputeNodeInstance> instances = metaDataPersistService.get().loadComputeNodeInstances(InstanceType.PROXY, trafficRule.get().getLabels());
+ return DataSourcePoolCreatorUtil.getDataSourceMap(createDataSourceConfigs(instances, dataSourceConfigSample, schema));
+ }
+
+ private Map<String, DataSourceConfiguration> createDataSourceConfigs(final Collection<ComputeNodeInstance> instances,
+ final DataSourceConfiguration dataSourceConfigSample, final String schema) {
+ Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
+ for (ComputeNodeInstance each : instances) {
+ result.put(each.getInstanceDefinition().getInstanceId().getId(), createDataSourceConfig(each, dataSourceConfigSample, schema));
+ }
+ return result;
+ }
+
+ private DataSourceConfiguration createDataSourceConfig(final ComputeNodeInstance instance,
+ final DataSourceConfiguration dataSourceConfigSample, final String schema) {
+ Map<String, Object> props = dataSourceConfigSample.getProps();
+ props.put("jdbcUrl", createJdbcUrl(instance, schema, props));
+ Preconditions.checkState(!instance.getUsers().isEmpty(), "Can not get users from meta data.");
+ ShardingSphereUser user = instance.getUsers().iterator().next();
+ props.put("username", user.getGrantee().getUsername());
+ props.put("password", user.getPassword());
+ DataSourceConfiguration result = new DataSourceConfiguration(HikariDataSource.class.getName());
+ result.getProps().putAll(props);
+ return result;
+ }
+
+ private String createJdbcUrl(final ComputeNodeInstance instance, final String schema, final Map<String, Object> props) {
+ String jdbcUrl = String.valueOf(props.get("jdbcUrl"));
+ String username = String.valueOf(props.get("username"));
+ DataSourceMetaData dataSourceMetaData = DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl, username);
+ InstanceId instanceId = instance.getInstanceDefinition().getInstanceId();
+ return jdbcUrl.replace(dataSourceMetaData.getHostname(), instanceId.getIp())
+ .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(), schema);
+ }
+
private ConnectionTransaction createConnectionTransaction(final String schemaName, final ContextManager contextManager) {
TransactionType type = TransactionTypeHolder.get();
if (null == type) {
@@ -174,7 +235,8 @@ public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseab
* @return random physical data source name
*/
public String getRandomPhysicalDataSourceName() {
- Collection<String> datasourceNames = cachedConnections.isEmpty() ? dataSourceMap.keySet() : cachedConnections.keySet();
+ Collection<String> cachedPhysicalDataSourceNames = Sets.intersection(physicalDataSourceMap.keySet(), cachedConnections.keySet());
+ Collection<String> datasourceNames = cachedPhysicalDataSourceNames.isEmpty() ? physicalDataSourceMap.keySet() : cachedPhysicalDataSourceNames;
return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
}
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 3895e5b..fd4caaa 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -75,8 +75,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.traffic.context.TrafficContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import java.sql.Connection;
@@ -188,10 +186,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
- return trafficExecutor.executeQuery(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
}
// TODO move federation route logic to binder
executionContext = createExecutionContext(logicSQL);
@@ -206,6 +203,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
}
}
+ private JDBCExecutionUnit createTrafficExecutionUnit(final TrafficContext trafficContext) throws SQLException {
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+ ExecutionGroupContext<JDBCExecutionUnit> context = prepareEngine.prepare(trafficContext.getRouteContext(), trafficContext.getExecutionUnits());
+ return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(() -> new ShardingSphereException("Can not get traffic execution unit."));
+ }
+
private TrafficContext createTrafficContext(final LogicSQL logicSQL) {
Optional<TrafficRule> trafficRule = metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
return trafficRule.map(optional -> new TrafficEngine(optional, metaDataContexts).dispatch(logicSQL)).orElse(new TrafficContext());
@@ -260,10 +263,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
- return trafficExecutor.executeUpdate(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -313,10 +315,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
clearPrevious();
LogicSQL logicSQL = createLogicSQL();
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
- return trafficExecutor.execute(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).execute());
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute());
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -369,7 +370,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
if (null != currentResultSet) {
return currentResultSet;
}
- if (trafficContext.getInstanceId().isPresent()) {
+ if (trafficContext.isMatchTraffic()) {
return executor.getTrafficExecutor().getResultSet();
}
if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 616311f..78b4597 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -71,9 +71,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import org.apache.shardingsphere.traffic.context.TrafficContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import java.sql.Connection;
@@ -143,10 +140,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.executeQuery(logicSQL, context, Statement::executeQuery);
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, Statement::executeQuery);
}
executionContext = createExecutionContext(logicSQL);
// TODO move federation route logic to binder
@@ -201,10 +197,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.executeUpdate(logicSQL, context, Statement::executeUpdate);
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, Statement::executeUpdate);
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -227,10 +222,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -251,10 +245,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -275,10 +268,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -364,15 +356,13 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
return executor.getRegularExecutor().execute(executionGroupContext, executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
}
- @SuppressWarnings({"rawtypes", "unchecked"})
private boolean execute0(final String sql, final ExecuteCallback callback) throws SQLException {
try {
LogicSQL logicSQL = createLogicSQL(sql);
trafficContext = createTrafficContext(logicSQL);
- if (trafficContext.getInstanceId().isPresent()) {
- TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
- TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
- return trafficExecutor.execute(logicSQL, context, (TrafficExecutorCallback) (statement, actualSQL) -> callback.execute(actualSQL, statement));
+ if (trafficContext.isMatchTraffic()) {
+ JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+ return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> callback.execute(actualSQL, statement));
}
executionContext = createExecutionContext(logicSQL);
if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -393,6 +383,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
}
}
+ private JDBCExecutionUnit createTrafficExecutionUnit(final TrafficContext trafficContext) throws SQLException {
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+ ExecutionGroupContext<JDBCExecutionUnit> context = prepareEngine.prepare(trafficContext.getRouteContext(), trafficContext.getExecutionUnits());
+ return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(() -> new ShardingSphereException("Can not get traffic execution unit."));
+ }
+
private void clearStatements() throws SQLException {
for (Statement each : statements) {
each.close();
@@ -450,7 +446,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
if (null != currentResultSet) {
return currentResultSet;
}
- if (trafficContext.getInstanceId().isPresent()) {
+ if (trafficContext.isMatchTraffic()) {
return executor.getTrafficExecutor().getResultSet();
}
if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
index 421f71a..e3dc9bd 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
@@ -17,41 +17,110 @@
package org.apache.shardingsphere.driver.jdbc.core.connection;
+import com.google.common.collect.Sets;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceType;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
public final class ConnectionManagerTest {
private ConnectionManager connectionManager;
+ private MockedStatic<DataSourcePoolCreatorUtil> dataSourcePoolCreatorUtil;
+
@Before
public void setUp() throws SQLException {
connectionManager = new ConnectionManager(DefaultSchema.LOGIC_NAME, mockContextManager());
}
+ @After
+ public void cleanUp() {
+ dataSourcePoolCreatorUtil.close();
+ }
+
private ContextManager mockContextManager() throws SQLException {
ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
Map<String, DataSource> dataSourceMap = mockDataSourceMap();
+ TrafficRule trafficRule = mockTrafficRule();
+ MetaDataPersistService metaDataPersistService = mockMetaDataPersistService();
when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+ when(result.getMetaDataContexts().getMetaDataPersistService()).thenReturn(Optional.of(metaDataPersistService));
when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+ when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TrafficRule.class)).thenReturn(Optional.of(trafficRule));
+ dataSourcePoolCreatorUtil = mockStatic(DataSourcePoolCreatorUtil.class);
+ Map<String, DataSource> trafficDataSourceMap = mockTrafficDataSourceMap();
+ when(DataSourcePoolCreatorUtil.getDataSourceMap(any())).thenReturn(trafficDataSourceMap);
+ return result;
+ }
+
+ private Map<String, DataSource> mockTrafficDataSourceMap() {
+ Map<String, DataSource> trafficDataSourceMap = new LinkedHashMap<>();
+ trafficDataSourceMap.put("127.0.0.1@3307", mock(DataSource.class));
+ return trafficDataSourceMap;
+ }
+
+ private MetaDataPersistService mockMetaDataPersistService() {
+ MetaDataPersistService result = mock(MetaDataPersistService.class, RETURNS_DEEP_STUBS);
+ when(result.getDataSourceService().load(DefaultSchema.LOGIC_NAME)).thenReturn(mockDataSourceConfigMap());
+ when(result.loadComputeNodeInstances(InstanceType.PROXY, Arrays.asList("OLTP", "OLAP"))).thenReturn(Collections.singletonList(mockComputeNodeInstance()));
+ return result;
+ }
+
+ private Map<String, DataSourceConfiguration> mockDataSourceConfigMap() {
+ Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
+ DataSourceConfiguration dataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+ result.put(DefaultSchema.LOGIC_NAME, dataSourceConfig);
+ dataSourceConfig.getProps().put("jdbcUrl", "jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false");
+ dataSourceConfig.getProps().put("username", "root");
+ dataSourceConfig.getProps().put("password", "123456");
+ return result;
+ }
+
+ private ComputeNodeInstance mockComputeNodeInstance() {
+ ComputeNodeInstance result = new ComputeNodeInstance();
+ result.setUsers(Collections.singletonList(new ShardingSphereUser("root", "root", "localhost")));
+ result.setLabels(Collections.singletonList("OLTP"));
+ result.setInstanceDefinition(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
+ return result;
+ }
+
+ private TrafficRule mockTrafficRule() {
+ TrafficRule result = mock(TrafficRule.class);
+ when(result.getLabels()).thenReturn(Arrays.asList("OLTP", "OLAP"));
return result;
}
@@ -65,10 +134,9 @@ public final class ConnectionManagerTest {
}
@Test
- public void assertGetRandomPhysicalDataSourceNameFromContextManager() throws SQLException {
- connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+ public void assertGetRandomPhysicalDataSourceNameFromContextManager() {
String actual = connectionManager.getRandomPhysicalDataSourceName();
- assertThat(actual, is("ds"));
+ assertTrue(Sets.newHashSet("ds", "invalid_ds").contains(actual));
}
@Test
@@ -85,6 +153,12 @@ public final class ConnectionManagerTest {
}
@Test
+ public void assertGetConnectionWhenConfigTrafficRule() throws SQLException {
+ assertThat(connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY),
+ is(connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY)));
+ }
+
+ @Test
public void assertGetConnectionsWhenAllInCache() throws SQLException {
Connection expected = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
@@ -93,12 +167,26 @@ public final class ConnectionManagerTest {
}
@Test
+ public void assertGetConnectionsWhenConfigTrafficRuleAndAllInCache() throws SQLException {
+ Connection expected = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+ List<Connection> actual = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.CONNECTION_STRICTLY);
+ assertThat(actual.size(), is(1));
+ assertThat(actual.get(0), is(expected));
+ }
+
+ @Test
public void assertGetConnectionsWhenEmptyCache() throws SQLException {
List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
assertThat(actual.size(), is(1));
}
@Test
+ public void assertGetConnectionsWhenConfigTrafficRuleAndEmptyCache() throws SQLException {
+ List<Connection> actual = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY);
+ assertThat(actual.size(), is(1));
+ }
+
+ @Test
public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
index 4f4cbe5..6ac16ff 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
@@ -48,10 +48,5 @@
<artifactId>shardingsphere-cluster-mode-core</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
index eb38ab7..4313df4 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
@@ -19,8 +19,11 @@ package org.apache.shardingsphere.traffic.context;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.route.context.RouteContext;
-import java.util.Optional;
+import java.util.Collection;
+import java.util.LinkedList;
/**
* Traffic context.
@@ -29,14 +32,16 @@ import java.util.Optional;
@Setter
public final class TrafficContext {
- private String instanceId;
+ private RouteContext routeContext = new RouteContext();
+
+ private Collection<ExecutionUnit> executionUnits = new LinkedList<>();
/**
- * Get instance id.
+ * Judge whether statement is match traffic or not.
*
- * @return instance id
+ * @return whether statement is match traffic or not
*/
- public Optional<String> getInstanceId() {
- return Optional.ofNullable(instanceId);
+ public boolean isMatchTraffic() {
+ return executionUnits.size() > 0;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 76284f5..69b560a 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.traffic.engine;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.InstanceType;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -57,11 +59,16 @@ public final class TrafficEngine {
List<String> instanceIds = getInstanceIdsByLabels(strategyRule.get().getLabels());
if (!instanceIds.isEmpty()) {
TrafficLoadBalanceAlgorithm loadBalancer = trafficRule.findLoadBalancer(strategyRule.get().getLoadBalancerName());
- result.setInstanceId(loadBalancer.getInstanceId(strategyRule.get().getName(), instanceIds));
+ String instanceId = loadBalancer.getInstanceId(strategyRule.get().getName(), instanceIds);
+ result.getExecutionUnits().add(createExecutionUnit(logicSQL, instanceId));
}
return result;
}
+ private ExecutionUnit createExecutionUnit(final LogicSQL logicSQL, final String instanceId) {
+ return new ExecutionUnit(instanceId, new SQLUnit(logicSQL.getSql(), logicSQL.getParameters()));
+ }
+
private List<String> getInstanceIdsByLabels(final Collection<String> labels) {
List<String> result = new ArrayList<>();
if (metaDataContexts.getMetaDataPersistService().isPresent()) {
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index f80df43..cd09896 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -17,61 +17,52 @@
package org.apache.shardingsphere.traffic.executor;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.List;
/**
* Traffic executor.
*/
-public interface TrafficExecutor extends AutoCloseable {
+public final class TrafficExecutor implements AutoCloseable {
- /**
- * Prepare for traffic executor.
- *
- * @param logicSQL logic SQL
- * @param instanceId instance id
- * @param type type
- * @return traffic executor context
- * @throws SQLException SQL exception
- */
- TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String instanceId, String type) throws SQLException;
+ private Statement statement;
/**
- * Execute query.
+ * Execute.
*
- * @param logicSQL logic SQL
- * @param context traffic executor context
+ * @param executionUnit execution unit
* @param callback traffic executor callback
- * @return result set
+ * @param <T> return type
+ * @return execute result
* @throws SQLException SQL exception
*/
- ResultSet executeQuery(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<ResultSet> callback) throws SQLException;
+ public <T> T execute(final JDBCExecutionUnit executionUnit, final TrafficExecutorCallback<T> callback) throws SQLException {
+ SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
+ cacheStatement(sqlUnit.getParameters(), executionUnit.getStorageResource());
+ return callback.execute(statement, sqlUnit.getSql());
+ }
- /**
- * Execute update.
- *
- * @param logicSQL logic SQL
- * @param context traffic executor context
- * @param callback traffic executor callback
- * @return update count
- * @throws SQLException SQL exception
- */
- int executeUpdate(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<Integer> callback) throws SQLException;
+ private void cacheStatement(final List<Object> parameters, final Statement statement) throws SQLException {
+ this.statement = statement;
+ setParameters(statement, parameters);
+ }
- /**
- * Execute.
- *
- * @param logicSQL logic SQL
- * @param context traffic executor context
- * @param callback traffic executor callback
- * @return whether execute success or not
- * @throws SQLException SQL exception
- */
- boolean execute(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<Boolean> callback) throws SQLException;
+ private void setParameters(final Statement statement, final List<Object> parameters) throws SQLException {
+ if (!(statement instanceof PreparedStatement)) {
+ return;
+ }
+ int index = 1;
+ for (Object each : parameters) {
+ ((PreparedStatement) statement).setObject(index++, each);
+ }
+ }
/**
* Get result set.
@@ -79,8 +70,16 @@ public interface TrafficExecutor extends AutoCloseable {
* @return result set
* @throws SQLException SQL exception
*/
- ResultSet getResultSet() throws SQLException;
+ public ResultSet getResultSet() throws SQLException {
+ return statement.getResultSet();
+ }
@Override
- void close() throws SQLException;
+ public void close() throws SQLException {
+ if (null != statement) {
+ Connection connection = statement.getConnection();
+ statement.close();
+ connection.close();
+ }
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java
deleted file mode 100644
index 0850775..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.traffic.executor.jdbc.JDBCTrafficExecutor;
-
-/**
- * Traffic executor factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class TrafficExecutorFactory {
-
- /**
- * Create new instance of traffic executor factory.
- *
- * @param schema schema
- * @param metaDataContexts meta data contexts
- * @return new instance of traffic executor
- */
- public static TrafficExecutor newInstance(final String schema, final MetaDataContexts metaDataContexts) {
- return new JDBCTrafficExecutor(schema, metaDataContexts);
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java
deleted file mode 100644
index d2391e8..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.sql.Statement;
-
-/**
- * Traffic executor context.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TrafficExecutorContext<T extends Statement> {
-
- private final T statement;
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java
deleted file mode 100644
index 5496244..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-/**
- * Prepared statement executor context builder.
- */
-public final class PreparedStatementExecutorContextBuilder implements TrafficExecutorContextBuilder<PreparedStatement> {
-
- @Override
- public TrafficExecutorContext<PreparedStatement> build(final LogicSQL logicSQL, final Connection connection) throws SQLException {
- return new TrafficExecutorContext<>(connection.prepareStatement(logicSQL.getSql()));
- }
-
- @Override
- public String getType() {
- return JDBCDriverType.PREPARED_STATEMENT;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java
deleted file mode 100644
index ea29fa6..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Statement executor context builder.
- */
-public final class StatementExecutorContextBuilder implements TrafficExecutorContextBuilder<Statement> {
-
- @Override
- public TrafficExecutorContext<Statement> build(final LogicSQL logicSQL, final Connection connection) throws SQLException {
- return new TrafficExecutorContext<>(connection.createStatement());
- }
-
- @Override
- public String getType() {
- return JDBCDriverType.STATEMENT;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java
deleted file mode 100644
index 33daef3..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.spi.typed.TypedSPI;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Traffic executor context builder.
- */
-public interface TrafficExecutorContextBuilder<T extends Statement> extends TypedSPI {
-
- /**
- * Build traffic executor context.
- *
- * @param logicSQL logic SQL
- * @param connection connection
- * @return traffic executor context
- * @throws SQLException SQL exception
- */
- TrafficExecutorContext<T> build(LogicSQL logicSQL, Connection connection) throws SQLException;
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
deleted file mode 100644
index 7e20d43..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.jdbc;
-
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.InstanceId;
-import org.apache.shardingsphere.infra.instance.InstanceType;
-import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-import org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * JDBC traffic executor.
- */
-public final class JDBCTrafficExecutor implements TrafficExecutor {
-
- private static final Map<String, TrafficExecutorContextBuilder<?>> TYPE_CONTEXT_BUILDERS = new ConcurrentHashMap<>();
-
- private static final String JDBC_URL = "jdbcUrl";
-
- private static final String USER_NAME = "username";
-
- private static final String PASSWORD = "password";
-
- private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
-
- private Statement statement;
-
- static {
- ShardingSphereServiceLoader.register(TrafficExecutorContextBuilder.class);
- }
-
- public JDBCTrafficExecutor(final String schema, final MetaDataContexts metaDataContexts) {
- Optional<TrafficRule> trafficRule = metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
- if (trafficRule.isPresent() && metaDataContexts.getMetaDataPersistService().isPresent()) {
- Map<String, DataSourceConfiguration> dataSourceConfigs = metaDataContexts.getMetaDataPersistService().get().getDataSourceService().load(schema);
- if (dataSourceConfigs.isEmpty()) {
- throw new ShardingSphereException("Can not get dataSource configs from meta data.");
- }
- DataSourceConfiguration dataSourceConfigSample = dataSourceConfigs.values().iterator().next();
- Collection<ComputeNodeInstance> instances = metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, trafficRule.get().getLabels());
- dataSources.putAll(DataSourcePoolCreatorUtil.getDataSourceMap(createDataSourceConfigs(instances, dataSourceConfigSample, schema)));
- }
- }
-
- private Map<String, DataSourceConfiguration> createDataSourceConfigs(final Collection<ComputeNodeInstance> instances,
- final DataSourceConfiguration dataSourceConfigSample, final String schema) {
- Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
- for (ComputeNodeInstance each : instances) {
- result.put(each.getInstanceDefinition().getInstanceId().getId(), createDataSourceConfig(each, dataSourceConfigSample, schema));
- }
- return result;
- }
-
- private DataSourceConfiguration createDataSourceConfig(final ComputeNodeInstance instance,
- final DataSourceConfiguration dataSourceConfigSample, final String schema) {
- Map<String, Object> props = dataSourceConfigSample.getProps();
- props.put(JDBC_URL, createJdbcUrl(instance, schema, props));
- if (instance.getUsers().isEmpty()) {
- throw new ShardingSphereException("Can not get users from meta data.");
- }
- ShardingSphereUser user = instance.getUsers().iterator().next();
- props.put(USER_NAME, user.getGrantee().getUsername());
- props.put(PASSWORD, user.getPassword());
- DataSourceConfiguration result = new DataSourceConfiguration(HikariDataSource.class.getName());
- result.getProps().putAll(props);
- return result;
- }
-
- private String createJdbcUrl(final ComputeNodeInstance instance, final String schema, final Map<String, Object> props) {
- String jdbcUrl = String.valueOf(props.get(JDBC_URL));
- String username = String.valueOf(props.get(USER_NAME));
- DataSourceMetaData dataSourceMetaData = DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl, username);
- InstanceId instanceId = instance.getInstanceDefinition().getInstanceId();
- return jdbcUrl.replace(dataSourceMetaData.getHostname(), instanceId.getIp())
- .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(), schema);
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Override
- public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL, final String instanceId, final String type) throws SQLException {
- if (!dataSources.containsKey(instanceId)) {
- throw new ShardingSphereException("Can not get dataSource of %.", instanceId);
- }
- DataSource dataSource = dataSources.get(instanceId);
- TrafficExecutorContextBuilder builder = getCachedContextBuilder(type);
- return builder.build(logicSQL, dataSource.getConnection());
- }
-
- private TrafficExecutorContextBuilder<?> getCachedContextBuilder(final String type) {
- TrafficExecutorContextBuilder<?> result;
- if (null == (result = TYPE_CONTEXT_BUILDERS.get(type))) {
- result = TYPE_CONTEXT_BUILDERS.computeIfAbsent(type, key -> TypedSPIRegistry.getRegisteredService(TrafficExecutorContextBuilder.class, key, new Properties()));
- }
- return result;
- }
-
- @Override
- public ResultSet executeQuery(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<ResultSet> callback) throws SQLException {
- cacheStatement(logicSQL.getParameters(), context.getStatement());
- return callback.execute(statement, logicSQL.getSql());
- }
-
- private void cacheStatement(final List<Object> parameters, final Statement statement) throws SQLException {
- this.statement = statement;
- setParameters(statement, parameters);
- }
-
- private void setParameters(final Statement statement, final List<Object> parameters) throws SQLException {
- if (statement instanceof PreparedStatement) {
- int index = 1;
- for (Object each : parameters) {
- ((PreparedStatement) statement).setObject(index++, each);
- }
- }
- }
-
- @Override
- public int executeUpdate(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<Integer> callback) throws SQLException {
- cacheStatement(logicSQL.getParameters(), context.getStatement());
- return callback.execute(statement, logicSQL.getSql());
- }
-
- @Override
- public boolean execute(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<Boolean> callback) throws SQLException {
- cacheStatement(logicSQL.getParameters(), context.getStatement());
- return callback.execute(statement, logicSQL.getSql());
- }
-
- @Override
- public ResultSet getResultSet() throws SQLException {
- return statement.getResultSet();
- }
-
- @Override
- public void close() throws SQLException {
- if (null != statement) {
- Connection connection = statement.getConnection();
- statement.close();
- connection.close();
- }
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder
deleted file mode 100644
index da5b38f..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.traffic.executor.context.builder.StatementExecutorContextBuilder
-org.apache.shardingsphere.traffic.executor.context.builder.PreparedStatementExecutorContextBuilder