You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/07 03:59:18 UTC

[shardingsphere] branch master updated: support transaction manager for traffic (#14578)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c676417  support transaction manager for traffic (#14578)
c676417 is described below

commit c6764176e008bf8fe153bb2ad176db9b8d96a25c
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Fri Jan 7 11:58:24 2022 +0800

    support transaction manager for traffic (#14578)
---
 .../shardingsphere-jdbc-core/pom.xml               |   1 +
 .../driver/executor/DriverExecutor.java            |   3 +-
 .../jdbc/core/connection/ConnectionManager.java    |  68 +++++++-
 .../statement/ShardingSpherePreparedStatement.java |  31 ++--
 .../core/statement/ShardingSphereStatement.java    |  54 +++---
 .../core/connection/ConnectionManagerTest.java     |  94 ++++++++++-
 .../shardingsphere-traffic-core/pom.xml            |   5 -
 .../traffic/context/TrafficContext.java            |  17 +-
 .../traffic/engine/TrafficEngine.java              |   9 +-
 .../traffic/executor/TrafficExecutor.java          |  79 +++++----
 .../traffic/executor/TrafficExecutorFactory.java   |  41 -----
 .../executor/context/TrafficExecutorContext.java   |  33 ----
 .../PreparedStatementExecutorContextBuilder.java   |  42 -----
 .../builder/StatementExecutorContextBuilder.java   |  42 -----
 .../builder/TrafficExecutorContextBuilder.java     |  42 -----
 .../traffic/executor/jdbc/JDBCTrafficExecutor.java | 185 ---------------------
 ...r.context.builder.TrafficExecutorContextBuilder |  19 ---
 17 files changed, 257 insertions(+), 508 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml b/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
index 442c494..dbfdde1 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/pom.xml
@@ -143,6 +143,7 @@
         <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index b3015a8..779c46c 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.infra.federation.executor.FederationExecutor;
 import org.apache.shardingsphere.infra.federation.executor.FederationExecutorFactory;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorFactory;
 
 import java.sql.SQLException;
 
@@ -49,7 +48,7 @@ public final class DriverExecutor implements AutoCloseable {
         regularExecutor = new DriverJDBCExecutor(connection.getSchema(), metaDataContexts, jdbcExecutor);
         rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
         federationExecutor = FederationExecutorFactory.newInstance(connection.getSchema(), metaDataContexts.getOptimizerContext(), metaDataContexts.getProps(), jdbcExecutor);
-        trafficExecutor = TrafficExecutorFactory.newInstance(connection.getSchema(), metaDataContexts);
+        trafficExecutor = new TrafficExecutor();
     }
     
     /**
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
index 80ff54e..b6f0e94 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManager.java
@@ -20,13 +20,25 @@ package org.apache.shardingsphere.driver.jdbc.core.connection;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.zaxxer.hikari.HikariDataSource;
 import lombok.Getter;
 import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
 import org.apache.shardingsphere.driver.jdbc.adapter.invocation.MethodInvocationRecorder;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
+import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.ExecutorJDBCManager;
 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceId;
+import org.apache.shardingsphere.infra.instance.InstanceType;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.ConnectionTransaction;
 import org.apache.shardingsphere.transaction.core.TransactionType;
 import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
@@ -41,6 +53,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -51,7 +64,9 @@ import java.util.Random;
  */
 public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseable {
     
-    private final Map<String, DataSource> dataSourceMap;
+    private final Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();
+    
+    private final Map<String, DataSource> physicalDataSourceMap = new LinkedHashMap<>();
     
     @Getter
     private final ConnectionTransaction connectionTransaction;
@@ -65,10 +80,56 @@ public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseab
     private final Random random = new SecureRandom();
     
     public ConnectionManager(final String schema, final ContextManager contextManager) {
-        dataSourceMap = contextManager.getDataSourceMap(schema);
+        dataSourceMap.putAll(contextManager.getDataSourceMap(schema));
+        dataSourceMap.putAll(getTrafficDataSourceMap(schema, contextManager));
+        physicalDataSourceMap.putAll(contextManager.getDataSourceMap(schema));
         connectionTransaction = createConnectionTransaction(schema, contextManager);
     }
     
+    private Map<String, DataSource> getTrafficDataSourceMap(final String schema, final ContextManager contextManager) {
+        Optional<TrafficRule> trafficRule = contextManager.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
+        Optional<MetaDataPersistService> metaDataPersistService = contextManager.getMetaDataContexts().getMetaDataPersistService();
+        if (!trafficRule.isPresent() || !metaDataPersistService.isPresent()) {
+            return Collections.emptyMap();
+        }
+        Map<String, DataSourceConfiguration> dataSourceConfigs = metaDataPersistService.get().getDataSourceService().load(schema);
+        Preconditions.checkState(!dataSourceConfigs.isEmpty(), "Can not get dataSource configurations from meta data.");
+        DataSourceConfiguration dataSourceConfigSample = dataSourceConfigs.values().iterator().next();
+        Collection<ComputeNodeInstance> instances = metaDataPersistService.get().loadComputeNodeInstances(InstanceType.PROXY, trafficRule.get().getLabels());
+        return DataSourcePoolCreatorUtil.getDataSourceMap(createDataSourceConfigs(instances, dataSourceConfigSample, schema));
+    }
+    
+    private Map<String, DataSourceConfiguration> createDataSourceConfigs(final Collection<ComputeNodeInstance> instances,
+                                                                         final DataSourceConfiguration dataSourceConfigSample, final String schema) {
+        Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
+        for (ComputeNodeInstance each : instances) {
+            result.put(each.getInstanceDefinition().getInstanceId().getId(), createDataSourceConfig(each, dataSourceConfigSample, schema));
+        }
+        return result;
+    }
+    
+    private DataSourceConfiguration createDataSourceConfig(final ComputeNodeInstance instance,
+                                                           final DataSourceConfiguration dataSourceConfigSample, final String schema) {
+        Map<String, Object> props = dataSourceConfigSample.getProps();
+        props.put("jdbcUrl", createJdbcUrl(instance, schema, props));
+        Preconditions.checkState(!instance.getUsers().isEmpty(), "Can not get users from meta data.");
+        ShardingSphereUser user = instance.getUsers().iterator().next();
+        props.put("username", user.getGrantee().getUsername());
+        props.put("password", user.getPassword());
+        DataSourceConfiguration result = new DataSourceConfiguration(HikariDataSource.class.getName());
+        result.getProps().putAll(props);
+        return result;
+    }
+    
+    private String createJdbcUrl(final ComputeNodeInstance instance, final String schema, final Map<String, Object> props) {
+        String jdbcUrl = String.valueOf(props.get("jdbcUrl"));
+        String username = String.valueOf(props.get("username"));
+        DataSourceMetaData dataSourceMetaData = DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl, username);
+        InstanceId instanceId = instance.getInstanceDefinition().getInstanceId();
+        return jdbcUrl.replace(dataSourceMetaData.getHostname(), instanceId.getIp())
+                .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(), schema);
+    }
+    
     private ConnectionTransaction createConnectionTransaction(final String schemaName, final ContextManager contextManager) {
         TransactionType type = TransactionTypeHolder.get();
         if (null == type) {
@@ -174,7 +235,8 @@ public final class ConnectionManager implements ExecutorJDBCManager, AutoCloseab
      * @return random physical data source name
      */
     public String getRandomPhysicalDataSourceName() {
-        Collection<String> datasourceNames = cachedConnections.isEmpty() ? dataSourceMap.keySet() : cachedConnections.keySet();
+        Collection<String> cachedPhysicalDataSourceNames = Sets.intersection(physicalDataSourceMap.keySet(), cachedConnections.keySet());
+        Collection<String> datasourceNames = cachedPhysicalDataSourceNames.isEmpty() ? physicalDataSourceMap.keySet() : cachedPhysicalDataSourceNames;
         return new ArrayList<>(datasourceNames).get(random.nextInt(datasourceNames.size()));
     }
     
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 3895e5b..fd4caaa 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -75,8 +75,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import org.apache.shardingsphere.traffic.context.TrafficContext;
 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 
 import java.sql.Connection;
@@ -188,10 +186,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
-                return trafficExecutor.executeQuery(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
             }
             // TODO move federation route logic to binder
             executionContext = createExecutionContext(logicSQL);
@@ -206,6 +203,12 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         }
     }
     
+    private JDBCExecutionUnit createTrafficExecutionUnit(final TrafficContext trafficContext) throws SQLException {
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+        ExecutionGroupContext<JDBCExecutionUnit> context = prepareEngine.prepare(trafficContext.getRouteContext(), trafficContext.getExecutionUnits());
+        return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(() -> new ShardingSphereException("Can not get traffic execution unit."));
+    }
+    
     private TrafficContext createTrafficContext(final LogicSQL logicSQL) {
         Optional<TrafficRule> trafficRule = metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
         return trafficRule.map(optional -> new TrafficEngine(optional, metaDataContexts).dispatch(logicSQL)).orElse(new TrafficContext());
@@ -260,10 +263,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
-                return trafficExecutor.executeUpdate(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -313,10 +315,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
-                return trafficExecutor.execute(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).execute());
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).execute());
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -369,7 +370,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        if (trafficContext.getInstanceId().isPresent()) {
+        if (trafficContext.isMatchTraffic()) {
             return executor.getTrafficExecutor().getResultSet();
         }
         if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 616311f..78b4597 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -71,9 +71,6 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
 import org.apache.shardingsphere.traffic.context.TrafficContext;
 import org.apache.shardingsphere.traffic.engine.TrafficEngine;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 
 import java.sql.Connection;
@@ -143,10 +140,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.executeQuery(logicSQL, context, Statement::executeQuery);
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, Statement::executeQuery);
             }
             executionContext = createExecutionContext(logicSQL);
             // TODO move federation route logic to binder
@@ -201,10 +197,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.executeUpdate(logicSQL, context, Statement::executeUpdate);
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, Statement::executeUpdate);
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -227,10 +222,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -251,10 +245,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -275,10 +268,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -364,15 +356,13 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         return executor.getRegularExecutor().execute(executionGroupContext, executionContext.getLogicSQL(), routeUnits, jdbcExecutorCallback);
     }
     
-    @SuppressWarnings({"rawtypes", "unchecked"})
     private boolean execute0(final String sql, final ExecuteCallback callback) throws SQLException {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getInstanceId().isPresent()) {
-                TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
-                return trafficExecutor.execute(logicSQL, context, (TrafficExecutorCallback) (statement, actualSQL) -> callback.execute(actualSQL, statement));
+            if (trafficContext.isMatchTraffic()) {
+                JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficContext);
+                return executor.getTrafficExecutor().execute(executionUnit, (statement, actualSQL) -> callback.execute(actualSQL, statement));
             }
             executionContext = createExecutionContext(logicSQL);
             if (metaDataContexts.getMetaData(connection.getSchema()).getRuleMetaData().getRules().stream().anyMatch(each -> each instanceof RawExecutionRule)) {
@@ -393,6 +383,12 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         }
     }
     
+    private JDBCExecutionUnit createTrafficExecutionUnit(final TrafficContext trafficContext) throws SQLException {
+        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine();
+        ExecutionGroupContext<JDBCExecutionUnit> context = prepareEngine.prepare(trafficContext.getRouteContext(), trafficContext.getExecutionUnits());
+        return context.getInputGroups().stream().flatMap(each -> each.getInputs().stream()).findFirst().orElseThrow(() -> new ShardingSphereException("Can not get traffic execution unit."));
+    }
+    
     private void clearStatements() throws SQLException {
         for (Statement each : statements) {
             each.close();
@@ -450,7 +446,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        if (trafficContext.getInstanceId().isPresent()) {
+        if (trafficContext.isMatchTraffic()) {
             return executor.getTrafficExecutor().getResultSet();
         }
         if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
index 421f71a..e3dc9bd 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/jdbc/core/connection/ConnectionManagerTest.java
@@ -17,41 +17,110 @@
 
 package org.apache.shardingsphere.driver.jdbc.core.connection;
 
+import com.google.common.collect.Sets;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
 import org.apache.shardingsphere.infra.database.DefaultSchema;
 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.InstanceDefinition;
+import org.apache.shardingsphere.infra.instance.InstanceType;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.when;
 
 public final class ConnectionManagerTest {
     
     private ConnectionManager connectionManager;
     
+    private MockedStatic<DataSourcePoolCreatorUtil> dataSourcePoolCreatorUtil;
+    
     @Before
     public void setUp() throws SQLException {
         connectionManager = new ConnectionManager(DefaultSchema.LOGIC_NAME, mockContextManager());
     }
     
+    @After
+    public void cleanUp() {
+        dataSourcePoolCreatorUtil.close();
+    }
+    
     private ContextManager mockContextManager() throws SQLException {
         ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
         Map<String, DataSource> dataSourceMap = mockDataSourceMap();
+        TrafficRule trafficRule = mockTrafficRule();
+        MetaDataPersistService metaDataPersistService = mockMetaDataPersistService();
         when(result.getDataSourceMap(DefaultSchema.LOGIC_NAME)).thenReturn(dataSourceMap);
+        when(result.getMetaDataContexts().getMetaDataPersistService()).thenReturn(Optional.of(metaDataPersistService));
         when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TransactionRule.class)).thenReturn(Optional.empty());
+        when(result.getMetaDataContexts().getGlobalRuleMetaData().findSingleRule(TrafficRule.class)).thenReturn(Optional.of(trafficRule));
+        dataSourcePoolCreatorUtil = mockStatic(DataSourcePoolCreatorUtil.class);
+        Map<String, DataSource> trafficDataSourceMap = mockTrafficDataSourceMap();
+        when(DataSourcePoolCreatorUtil.getDataSourceMap(any())).thenReturn(trafficDataSourceMap);
+        return result;
+    }
+    
+    private Map<String, DataSource> mockTrafficDataSourceMap() {
+        Map<String, DataSource> trafficDataSourceMap = new LinkedHashMap<>();
+        trafficDataSourceMap.put("127.0.0.1@3307", mock(DataSource.class));
+        return trafficDataSourceMap;
+    }
+    
+    private MetaDataPersistService mockMetaDataPersistService() {
+        MetaDataPersistService result = mock(MetaDataPersistService.class, RETURNS_DEEP_STUBS);
+        when(result.getDataSourceService().load(DefaultSchema.LOGIC_NAME)).thenReturn(mockDataSourceConfigMap());
+        when(result.loadComputeNodeInstances(InstanceType.PROXY, Arrays.asList("OLTP", "OLAP"))).thenReturn(Collections.singletonList(mockComputeNodeInstance()));
+        return result;
+    }
+    
+    private Map<String, DataSourceConfiguration> mockDataSourceConfigMap() {
+        Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
+        DataSourceConfiguration dataSourceConfig = new DataSourceConfiguration(HikariDataSource.class.getName());
+        result.put(DefaultSchema.LOGIC_NAME, dataSourceConfig);
+        dataSourceConfig.getProps().put("jdbcUrl", "jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false");
+        dataSourceConfig.getProps().put("username", "root");
+        dataSourceConfig.getProps().put("password", "123456");
+        return result;
+    }
+    
+    private ComputeNodeInstance mockComputeNodeInstance() {
+        ComputeNodeInstance result = new ComputeNodeInstance();
+        result.setUsers(Collections.singletonList(new ShardingSphereUser("root", "root", "localhost")));
+        result.setLabels(Collections.singletonList("OLTP"));
+        result.setInstanceDefinition(new InstanceDefinition(InstanceType.PROXY, "127.0.0.1@3307"));
+        return result;
+    }
+    
+    private TrafficRule mockTrafficRule() {
+        TrafficRule result = mock(TrafficRule.class);
+        when(result.getLabels()).thenReturn(Arrays.asList("OLTP", "OLAP"));
         return result;
     }
     
@@ -65,10 +134,9 @@ public final class ConnectionManagerTest {
     }
     
     @Test
-    public void assertGetRandomPhysicalDataSourceNameFromContextManager() throws SQLException {
-        connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
+    public void assertGetRandomPhysicalDataSourceNameFromContextManager() {
         String actual = connectionManager.getRandomPhysicalDataSourceName();
-        assertThat(actual, is("ds"));
+        assertTrue(Sets.newHashSet("ds", "invalid_ds").contains(actual));
     }
     
     @Test
@@ -85,6 +153,12 @@ public final class ConnectionManagerTest {
     }
     
     @Test
+    public void assertGetConnectionWhenConfigTrafficRule() throws SQLException {
+        assertThat(connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY),
+                is(connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY)));
+    }
+    
+    @Test
     public void assertGetConnectionsWhenAllInCache() throws SQLException {
         Connection expected = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
         List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.CONNECTION_STRICTLY);
@@ -93,12 +167,26 @@ public final class ConnectionManagerTest {
     }
     
     @Test
+    public void assertGetConnectionsWhenConfigTrafficRuleAndAllInCache() throws SQLException {
+        Connection expected = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY).get(0);
+        List<Connection> actual = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.CONNECTION_STRICTLY);
+        assertThat(actual.size(), is(1));
+        assertThat(actual.get(0), is(expected));
+    }
+    
+    @Test
     public void assertGetConnectionsWhenEmptyCache() throws SQLException {
         List<Connection> actual = connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         assertThat(actual.size(), is(1));
     }
     
     @Test
+    public void assertGetConnectionsWhenConfigTrafficRuleAndEmptyCache() throws SQLException {
+        List<Connection> actual = connectionManager.getConnections("127.0.0.1@3307", 1, ConnectionMode.MEMORY_STRICTLY);
+        assertThat(actual.size(), is(1));
+    }
+    
+    @Test
     public void assertGetConnectionsWhenPartInCacheWithMemoryStrictlyMode() throws SQLException {
         connectionManager.getConnections("ds", 1, ConnectionMode.MEMORY_STRICTLY);
         List<Connection> actual = connectionManager.getConnections("ds", 3, ConnectionMode.MEMORY_STRICTLY);
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
index 4f4cbe5..6ac16ff 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/pom.xml
@@ -48,10 +48,5 @@
             <artifactId>shardingsphere-cluster-mode-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.zaxxer</groupId>
-            <artifactId>HikariCP</artifactId>
-            <scope>compile</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
index eb38ab7..4313df4 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
@@ -19,8 +19,11 @@ package org.apache.shardingsphere.traffic.context;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.route.context.RouteContext;
 
-import java.util.Optional;
+import java.util.Collection;
+import java.util.LinkedList;
 
 /**
  * Traffic context.
@@ -29,14 +32,16 @@ import java.util.Optional;
 @Setter
 public final class TrafficContext {
     
-    private String instanceId;
+    private RouteContext routeContext = new RouteContext();
+
+    private Collection<ExecutionUnit> executionUnits = new LinkedList<>();
     
     /**
-     * Get instance id.
+     * Judge whether statement is match traffic or not.
      * 
-     * @return instance id
+     * @return whether statement is match traffic or not
      */
-    public Optional<String> getInstanceId() {
-        return Optional.ofNullable(instanceId);
+    public boolean isMatchTraffic() {
+        return executionUnits.size() > 0;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 76284f5..69b560a 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.traffic.engine;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceType;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -57,11 +59,16 @@ public final class TrafficEngine {
         List<String> instanceIds = getInstanceIdsByLabels(strategyRule.get().getLabels());
         if (!instanceIds.isEmpty()) {
             TrafficLoadBalanceAlgorithm loadBalancer = trafficRule.findLoadBalancer(strategyRule.get().getLoadBalancerName());
-            result.setInstanceId(loadBalancer.getInstanceId(strategyRule.get().getName(), instanceIds));
+            String instanceId = loadBalancer.getInstanceId(strategyRule.get().getName(), instanceIds);
+            result.getExecutionUnits().add(createExecutionUnit(logicSQL, instanceId));
         }
         return result;
     }
     
+    private ExecutionUnit createExecutionUnit(final LogicSQL logicSQL, final String instanceId) {
+        return new ExecutionUnit(instanceId, new SQLUnit(logicSQL.getSql(), logicSQL.getParameters()));
+    }
+    
     private List<String> getInstanceIdsByLabels(final Collection<String> labels) {
         List<String> result = new ArrayList<>();
         if (metaDataContexts.getMetaDataPersistService().isPresent()) {
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index f80df43..cd09896 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -17,61 +17,52 @@
 
 package org.apache.shardingsphere.traffic.executor;
 
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
+import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
+import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.List;
 
 /**
  * Traffic executor.
  */
-public interface TrafficExecutor extends AutoCloseable {
+public final class TrafficExecutor implements AutoCloseable {
     
-    /**
-     * Prepare for traffic executor.
-     * 
-     * @param logicSQL logic SQL
-     * @param instanceId instance id
-     * @param type type
-     * @return traffic executor context
-     * @throws SQLException SQL exception
-     */
-    TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String instanceId, String type) throws SQLException;
+    private Statement statement;
     
     /**
-     * Execute query.
+     * Execute.
      * 
-     * @param logicSQL logic SQL
-     * @param context traffic executor context
+     * @param executionUnit execution unit
      * @param callback traffic executor callback
-     * @return result set
+     * @param <T> return type
+     * @return execute result
      * @throws SQLException SQL exception
      */
-    ResultSet executeQuery(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<ResultSet> callback) throws SQLException;
+    public <T> T execute(final JDBCExecutionUnit executionUnit, final TrafficExecutorCallback<T> callback) throws SQLException {
+        SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
+        cacheStatement(sqlUnit.getParameters(), executionUnit.getStorageResource());
+        return callback.execute(statement, sqlUnit.getSql());
+    }
     
-    /**
-     * Execute update.
-     * 
-     * @param logicSQL logic SQL
-     * @param context traffic executor context
-     * @param callback traffic executor callback
-     * @return update count
-     * @throws SQLException SQL exception
-     */
-    int executeUpdate(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<Integer> callback) throws SQLException;
+    private void cacheStatement(final List<Object> parameters, final Statement statement) throws SQLException {
+        this.statement = statement;
+        setParameters(statement, parameters);
+    }
     
-    /**
-     * Execute.
-     * 
-     * @param logicSQL logic SQL
-     * @param context traffic executor context
-     * @param callback traffic executor callback
-     * @return whether execute success or not
-     * @throws SQLException SQL exception
-     */
-    boolean execute(LogicSQL logicSQL, TrafficExecutorContext<Statement> context, TrafficExecutorCallback<Boolean> callback) throws SQLException;
+    private void setParameters(final Statement statement, final List<Object> parameters) throws SQLException {
+        if (!(statement instanceof PreparedStatement)) {
+            return;
+        }
+        int index = 1;
+        for (Object each : parameters) {
+            ((PreparedStatement) statement).setObject(index++, each);
+        }
+    }
     
     /**
      * Get result set.
@@ -79,8 +70,16 @@ public interface TrafficExecutor extends AutoCloseable {
      * @return result set
      * @throws SQLException SQL exception
      */
-    ResultSet getResultSet() throws SQLException;
+    public ResultSet getResultSet() throws SQLException {
+        return statement.getResultSet();
+    }
     
     @Override
-    void close() throws SQLException;
+    public void close() throws SQLException {
+        if (null != statement) {
+            Connection connection = statement.getConnection();
+            statement.close();
+            connection.close();
+        }
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java
deleted file mode 100644
index 0850775..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.traffic.executor.jdbc.JDBCTrafficExecutor;
-
-/**
- * Traffic executor factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class TrafficExecutorFactory {
-    
-    /**
-     * Create new instance of traffic executor factory.
-     * 
-     * @param schema schema
-     * @param metaDataContexts meta data contexts
-     * @return new instance of traffic executor
-     */
-    public static TrafficExecutor newInstance(final String schema, final MetaDataContexts metaDataContexts) {
-        return new JDBCTrafficExecutor(schema, metaDataContexts);
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java
deleted file mode 100644
index d2391e8..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/TrafficExecutorContext.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.sql.Statement;
-
-/**
- * Traffic executor context.
- */
-@RequiredArgsConstructor
-@Getter
-public final class TrafficExecutorContext<T extends Statement> {
-    
-    private final T statement;
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java
deleted file mode 100644
index 5496244..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/PreparedStatementExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-/**
- * Prepared statement executor context builder.
- */
-public final class PreparedStatementExecutorContextBuilder implements TrafficExecutorContextBuilder<PreparedStatement> {
-    
-    @Override
-    public TrafficExecutorContext<PreparedStatement> build(final LogicSQL logicSQL, final Connection connection) throws SQLException {
-        return new TrafficExecutorContext<>(connection.prepareStatement(logicSQL.getSql()));
-    }
-    
-    @Override
-    public String getType() {
-        return JDBCDriverType.PREPARED_STATEMENT;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java
deleted file mode 100644
index ea29fa6..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/StatementExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Statement executor context builder.
- */
-public final class StatementExecutorContextBuilder implements TrafficExecutorContextBuilder<Statement> {
-    
-    @Override
-    public TrafficExecutorContext<Statement> build(final LogicSQL logicSQL, final Connection connection) throws SQLException {
-        return new TrafficExecutorContext<>(connection.createStatement());
-    }
-    
-    @Override
-    public String getType() {
-        return JDBCDriverType.STATEMENT;
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java
deleted file mode 100644
index 33daef3..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/context/builder/TrafficExecutorContextBuilder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.context.builder;
-
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.spi.typed.TypedSPI;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * Traffic executor context builder.
- */
-public interface TrafficExecutorContextBuilder<T extends Statement> extends TypedSPI {
-    
-    /**
-     * Build traffic executor context.
-     * 
-     * @param logicSQL logic SQL
-     * @param connection connection
-     * @return traffic executor context
-     * @throws SQLException SQL exception
-     */
-    TrafficExecutorContext<T> build(LogicSQL logicSQL, Connection connection) throws SQLException;
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
deleted file mode 100644
index 7e20d43..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor.jdbc;
-
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.datasource.pool.creator.DataSourcePoolCreatorUtil;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.exception.ShardingSphereException;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.InstanceId;
-import org.apache.shardingsphere.infra.instance.InstanceType;
-import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.executor.context.TrafficExecutorContext;
-import org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
-
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * JDBC traffic executor.
- */
-public final class JDBCTrafficExecutor implements TrafficExecutor {
-    
-    private static final Map<String, TrafficExecutorContextBuilder<?>> TYPE_CONTEXT_BUILDERS = new ConcurrentHashMap<>();
-    
-    private static final String JDBC_URL = "jdbcUrl";
-    
-    private static final String USER_NAME = "username";
-    
-    private static final String PASSWORD = "password";
-    
-    private final Map<String, DataSource> dataSources = new LinkedHashMap<>();
-    
-    private Statement statement;
-    
-    static {
-        ShardingSphereServiceLoader.register(TrafficExecutorContextBuilder.class);
-    }
-    
-    public JDBCTrafficExecutor(final String schema, final MetaDataContexts metaDataContexts) {
-        Optional<TrafficRule> trafficRule = metaDataContexts.getGlobalRuleMetaData().findSingleRule(TrafficRule.class);
-        if (trafficRule.isPresent() && metaDataContexts.getMetaDataPersistService().isPresent()) {
-            Map<String, DataSourceConfiguration> dataSourceConfigs = metaDataContexts.getMetaDataPersistService().get().getDataSourceService().load(schema);
-            if (dataSourceConfigs.isEmpty()) {
-                throw new ShardingSphereException("Can not get dataSource configs from meta data.");
-            }
-            DataSourceConfiguration dataSourceConfigSample = dataSourceConfigs.values().iterator().next();
-            Collection<ComputeNodeInstance> instances = metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, trafficRule.get().getLabels());
-            dataSources.putAll(DataSourcePoolCreatorUtil.getDataSourceMap(createDataSourceConfigs(instances, dataSourceConfigSample, schema)));
-        }
-    }
-    
-    private Map<String, DataSourceConfiguration> createDataSourceConfigs(final Collection<ComputeNodeInstance> instances, 
-                                                                         final DataSourceConfiguration dataSourceConfigSample, final String schema) {
-        Map<String, DataSourceConfiguration> result = new LinkedHashMap<>();
-        for (ComputeNodeInstance each : instances) {
-            result.put(each.getInstanceDefinition().getInstanceId().getId(), createDataSourceConfig(each, dataSourceConfigSample, schema));
-        }
-        return result;
-    }
-    
-    private DataSourceConfiguration createDataSourceConfig(final ComputeNodeInstance instance, 
-                                                           final DataSourceConfiguration dataSourceConfigSample, final String schema) {
-        Map<String, Object> props = dataSourceConfigSample.getProps();
-        props.put(JDBC_URL, createJdbcUrl(instance, schema, props));
-        if (instance.getUsers().isEmpty()) {
-            throw new ShardingSphereException("Can not get users from meta data.");
-        }
-        ShardingSphereUser user = instance.getUsers().iterator().next();
-        props.put(USER_NAME, user.getGrantee().getUsername());
-        props.put(PASSWORD, user.getPassword());
-        DataSourceConfiguration result = new DataSourceConfiguration(HikariDataSource.class.getName());
-        result.getProps().putAll(props);
-        return result;
-    }
-    
-    private String createJdbcUrl(final ComputeNodeInstance instance, final String schema, final Map<String, Object> props) {
-        String jdbcUrl = String.valueOf(props.get(JDBC_URL));
-        String username = String.valueOf(props.get(USER_NAME));
-        DataSourceMetaData dataSourceMetaData = DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl, username);
-        InstanceId instanceId = instance.getInstanceDefinition().getInstanceId();
-        return jdbcUrl.replace(dataSourceMetaData.getHostname(), instanceId.getIp())
-                .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(), schema);
-    }
-    
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL, final String instanceId, final String type) throws SQLException {
-        if (!dataSources.containsKey(instanceId)) {
-            throw new ShardingSphereException("Can not get dataSource of %.", instanceId);
-        }
-        DataSource dataSource = dataSources.get(instanceId);
-        TrafficExecutorContextBuilder builder = getCachedContextBuilder(type);
-        return builder.build(logicSQL, dataSource.getConnection());
-    }
-    
-    private TrafficExecutorContextBuilder<?> getCachedContextBuilder(final String type) {
-        TrafficExecutorContextBuilder<?> result;
-        if (null == (result = TYPE_CONTEXT_BUILDERS.get(type))) {
-            result = TYPE_CONTEXT_BUILDERS.computeIfAbsent(type, key -> TypedSPIRegistry.getRegisteredService(TrafficExecutorContextBuilder.class, key, new Properties()));
-        }
-        return result;
-    }
-    
-    @Override
-    public ResultSet executeQuery(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<ResultSet> callback) throws SQLException {
-        cacheStatement(logicSQL.getParameters(), context.getStatement());
-        return callback.execute(statement, logicSQL.getSql());
-    }
-    
-    private void cacheStatement(final List<Object> parameters, final Statement statement) throws SQLException {
-        this.statement = statement;
-        setParameters(statement, parameters);
-    }
-    
-    private void setParameters(final Statement statement, final List<Object> parameters) throws SQLException {
-        if (statement instanceof PreparedStatement) {
-            int index = 1;
-            for (Object each : parameters) {
-                ((PreparedStatement) statement).setObject(index++, each);
-            }
-        }
-    }
-    
-    @Override
-    public int executeUpdate(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<Integer> callback) throws SQLException {
-        cacheStatement(logicSQL.getParameters(), context.getStatement());
-        return callback.execute(statement, logicSQL.getSql());
-    }
-    
-    @Override
-    public boolean execute(final LogicSQL logicSQL, final TrafficExecutorContext<Statement> context, final TrafficExecutorCallback<Boolean> callback) throws SQLException {
-        cacheStatement(logicSQL.getParameters(), context.getStatement());
-        return callback.execute(statement, logicSQL.getSql());
-    }
-    
-    @Override
-    public ResultSet getResultSet() throws SQLException {
-        return statement.getResultSet();
-    }
-    
-    @Override
-    public void close() throws SQLException {
-        if (null != statement) {
-            Connection connection = statement.getConnection();
-            statement.close();
-            connection.close();
-        }
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder
deleted file mode 100644
index da5b38f..0000000
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/resources/META-INF/services/org.apache.shardingsphere.traffic.executor.context.builder.TrafficExecutorContextBuilder
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.traffic.executor.context.builder.StatementExecutorContextBuilder
-org.apache.shardingsphere.traffic.executor.context.builder.PreparedStatementExecutorContextBuilder