You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/01/06 09:21:48 UTC

[shardingsphere] branch master updated: Add more TrafficLoadBalanceAlgorithm and optimize traffic logic (#14567)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e719e9b  Add more TrafficLoadBalanceAlgorithm and optimize traffic logic (#14567)
e719e9b is described below

commit e719e9bf19ab9212908f409888dc7c55be65d8e1
Author: Zhengqiang Duan <du...@apache.org>
AuthorDate: Thu Jan 6 17:20:19 2022 +0800

    Add more TrafficLoadBalanceAlgorithm and optimize traffic logic (#14567)
    
    * modify governance api reference in JDBCTrafficExecutor
    
    * add more TrafficLoadBalanceAlgorithm and optimize traffic logic
    
    * remove some algorithm
---
 .../statement/ShardingSpherePreparedStatement.java | 14 +++---
 .../core/statement/ShardingSphereStatement.java    | 26 +++++------
 .../api/traffic/segment/SegmentTrafficValue.java   |  2 +
 .../traffic/spi/TrafficLoadBalanceAlgorithm.java   |  9 ++--
 .../RandomTrafficLoadBalanceAlgorithm.java         |  4 +-
 ... => RoundRobinTrafficLoadBalanceAlgorithm.java} | 30 +++++++++---
 .../traffic/context/TrafficContext.java            | 10 ++--
 .../traffic/engine/TrafficEngine.java              | 11 +++--
 .../traffic/executor/TrafficExecutor.java          |  4 +-
 .../traffic/executor/jdbc/JDBCTrafficExecutor.java | 14 +++---
 .../shardingsphere/traffic/rule/TrafficRule.java   | 17 +++----
 .../RandomTrafficLoadBalanceAlgorithmTest.java}    | 25 +++++-----
 .../RoundRobinTrafficLoadBalanceAlgorithmTest.java | 53 ++++++++++++++++++++++
 13 files changed, 147 insertions(+), 72 deletions(-)

diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 9e79414..3895e5b 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -188,9 +188,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.PREPARED_STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
                 return trafficExecutor.executeQuery(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeQuery());
             }
             // TODO move federation route logic to binder
@@ -260,9 +260,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.PREPARED_STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
                 return trafficExecutor.executeUpdate(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).executeUpdate());
             }
             executionContext = createExecutionContext(logicSQL);
@@ -313,9 +313,9 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
             clearPrevious();
             LogicSQL logicSQL = createLogicSQL();
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.PREPARED_STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.PREPARED_STATEMENT);
                 return trafficExecutor.execute(logicSQL, context, (statement, sql) -> ((PreparedStatement) statement).execute());
             }
             executionContext = createExecutionContext(logicSQL);
@@ -369,7 +369,7 @@ public final class ShardingSpherePreparedStatement extends AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        if (trafficContext.getDataSourceName().isPresent()) {
+        if (trafficContext.getInstanceId().isPresent()) {
             return executor.getTrafficExecutor().getResultSet();
         }
         if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index acefef6..616311f 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -143,9 +143,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.executeQuery(logicSQL, context, Statement::executeQuery);
             }
             executionContext = createExecutionContext(logicSQL);
@@ -201,9 +201,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.executeUpdate(logicSQL, context, Statement::executeUpdate);
             }
             executionContext = createExecutionContext(logicSQL);
@@ -227,9 +227,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, autoGeneratedKeys));
             }
             executionContext = createExecutionContext(logicSQL);
@@ -251,9 +251,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnIndexes));
             }
             executionContext = createExecutionContext(logicSQL);
@@ -275,9 +275,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.executeUpdate(logicSQL, context, (statement, actualSQL) -> statement.executeUpdate(actualSQL, columnNames));
             }
             executionContext = createExecutionContext(logicSQL);
@@ -369,9 +369,9 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         try {
             LogicSQL logicSQL = createLogicSQL(sql);
             trafficContext = createTrafficContext(logicSQL);
-            if (trafficContext.getDataSourceName().isPresent()) {
+            if (trafficContext.getInstanceId().isPresent()) {
                 TrafficExecutor trafficExecutor = executor.getTrafficExecutor();
-                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getDataSourceName().get(), JDBCDriverType.STATEMENT);
+                TrafficExecutorContext<Statement> context = trafficExecutor.prepare(logicSQL, trafficContext.getInstanceId().get(), JDBCDriverType.STATEMENT);
                 return trafficExecutor.execute(logicSQL, context, (TrafficExecutorCallback) (statement, actualSQL) -> callback.execute(actualSQL, statement));
             }
             executionContext = createExecutionContext(logicSQL);
@@ -450,7 +450,7 @@ public final class ShardingSphereStatement extends AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
-        if (trafficContext.getDataSourceName().isPresent()) {
+        if (trafficContext.getInstanceId().isPresent()) {
             return executor.getTrafficExecutor().getResultSet();
         }
         if (executionContext.getRouteContext().isFederated()) {
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
index 9df41d0..4f21a77 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/api/traffic/segment/SegmentTrafficValue.java
@@ -30,4 +30,6 @@ import org.apache.shardingsphere.traffic.api.traffic.TrafficValue;
 public final class SegmentTrafficValue implements TrafficValue {
     
     private final SQLStatement statement;
+    
+    private final String sql;
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
index bea7fee..f237b94 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-api/src/main/java/org/apache/shardingsphere/traffic/spi/TrafficLoadBalanceAlgorithm.java
@@ -28,10 +28,11 @@ import java.util.List;
 public interface TrafficLoadBalanceAlgorithm extends ShardingSphereAlgorithm, RequiredSPI {
     
     /**
-     * Get dataSource name.
+     * Get instance id.
      * 
-     * @param dataSourceNames dataSource name collection
-     * @return dataSource name
+     * @param name traffic strategy name
+     * @param instanceIds instance id collection
+     * @return instance id
      */
-    String getDataSourceName(List<String> dataSourceNames);
+    String getInstanceId(String name, List<String> instanceIds);
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
index c5cb4fa..9fd4ba6 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
@@ -28,8 +28,8 @@ import java.util.concurrent.ThreadLocalRandom;
 public final class RandomTrafficLoadBalanceAlgorithm implements TrafficLoadBalanceAlgorithm {
     
     @Override
-    public String getDataSourceName(final List<String> dataSourceNames) {
-        return dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
+    public String getInstanceId(final String name, final List<String> instanceIds) {
+        return instanceIds.get(ThreadLocalRandom.current().nextInt(instanceIds.size()));
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
similarity index 52%
copy from shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
copy to shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
index c5cb4fa..adb726d 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithm.java
@@ -17,23 +17,41 @@
 
 package org.apache.shardingsphere.traffic.algorithm.loadbalance;
 
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.shardingsphere.traffic.spi.TrafficLoadBalanceAlgorithm;
 
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Random traffic load balance algorithm.
+ * Round-robin traffic load balance algorithm.
  */
-public final class RandomTrafficLoadBalanceAlgorithm implements TrafficLoadBalanceAlgorithm {
+@Getter
+@Setter
+public final class RoundRobinTrafficLoadBalanceAlgorithm implements TrafficLoadBalanceAlgorithm {
+    
+    private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>();
+    
+    private Properties props = new Properties();
     
     @Override
-    public String getDataSourceName(final List<String> dataSourceNames) {
-        return dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
+    public String getInstanceId(final String name, final List<String> instanceIds) {
+        AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
+        COUNTS.putIfAbsent(name, count);
+        count.compareAndSet(instanceIds.size(), 0);
+        return instanceIds.get(Math.abs(count.getAndIncrement()) % instanceIds.size());
     }
     
     @Override
     public String getType() {
-        return "RANDOM";
+        return "ROUND_ROBIN";
+    }
+    
+    @Override
+    public boolean isDefault() {
+        return true;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
index a05b820..eb38ab7 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/context/TrafficContext.java
@@ -29,14 +29,14 @@ import java.util.Optional;
 @Setter
 public final class TrafficContext {
     
-    private String dataSourceName;
+    private String instanceId;
     
     /**
-     * Get data source name.
+     * Get instance id.
      * 
-     * @return data source config
+     * @return instance id
      */
-    public Optional<String> getDataSourceName() {
-        return Optional.ofNullable(dataSourceName);
+    public Optional<String> getInstanceId() {
+        return Optional.ofNullable(instanceId);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
index 20a4160..76284f5 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/engine/TrafficEngine.java
@@ -54,18 +54,19 @@ public final class TrafficEngine {
         if (!strategyRule.isPresent()) {
             return result;
         }
-        List<String> dataSourceNames = getDataSourceNamesByLabels(strategyRule.get().getLabels());
-        if (!dataSourceNames.isEmpty()) {
+        List<String> instanceIds = getInstanceIdsByLabels(strategyRule.get().getLabels());
+        if (!instanceIds.isEmpty()) {
             TrafficLoadBalanceAlgorithm loadBalancer = trafficRule.findLoadBalancer(strategyRule.get().getLoadBalancerName());
-            result.setDataSourceName(loadBalancer.getDataSourceName(dataSourceNames));
+            result.setInstanceId(loadBalancer.getInstanceId(strategyRule.get().getName(), instanceIds));
         }
         return result;
     }
     
-    private List<String> getDataSourceNamesByLabels(final Collection<String> labels) {
+    private List<String> getInstanceIdsByLabels(final Collection<String> labels) {
         List<String> result = new ArrayList<>();
         if (metaDataContexts.getMetaDataPersistService().isPresent()) {
-            for (ComputeNodeInstance each : metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, labels)) {
+            Collection<ComputeNodeInstance> instances = metaDataContexts.getMetaDataPersistService().get().loadComputeNodeInstances(InstanceType.PROXY, labels);
+            for (ComputeNodeInstance each : instances) {
                 result.add(each.getInstanceDefinition().getInstanceId().getId());
             }
         }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index 701b25d..f80df43 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -33,12 +33,12 @@ public interface TrafficExecutor extends AutoCloseable {
      * Prepare for traffic executor.
      * 
      * @param logicSQL logic SQL
-     * @param dataSourceName dataSource name
+     * @param instanceId instance id
      * @param type type
      * @return traffic executor context
      * @throws SQLException SQL exception
      */
-    TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String dataSourceName, String type) throws SQLException;
+    TrafficExecutorContext<Statement> prepare(LogicSQL logicSQL, String instanceId, String type) throws SQLException;
     
     /**
      * Execute query.
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
index 4701588..7e20d43 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/executor/jdbc/JDBCTrafficExecutor.java
@@ -115,21 +115,21 @@ public final class JDBCTrafficExecutor implements TrafficExecutor {
         DataSourceMetaData dataSourceMetaData = DatabaseTypeRegistry.getDatabaseTypeByURL(jdbcUrl).getDataSourceMetaData(jdbcUrl, username);
         InstanceId instanceId = instance.getInstanceDefinition().getInstanceId();
         return jdbcUrl.replace(dataSourceMetaData.getHostname(), instanceId.getIp())
-                .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort()).replace(dataSourceMetaData.getCatalog(), schema));
+                .replace(String.valueOf(dataSourceMetaData.getPort()), String.valueOf(instanceId.getPort())).replace(dataSourceMetaData.getCatalog(), schema);
     }
     
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
-    public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL, final String dataSourceName, final String type) throws SQLException {
-        if (!dataSources.containsKey(dataSourceName)) {
-            throw new ShardingSphereException("Can not get dataSource of %.", dataSourceName);
+    public TrafficExecutorContext<Statement> prepare(final LogicSQL logicSQL, final String instanceId, final String type) throws SQLException {
+        if (!dataSources.containsKey(instanceId)) {
+            throw new ShardingSphereException("Can not get dataSource of %.", instanceId);
         }
-        DataSource dataSource = dataSources.get(dataSourceName);
-        TrafficExecutorContextBuilder builder = getCachedTrafficExecutorContextBuilder(type);
+        DataSource dataSource = dataSources.get(instanceId);
+        TrafficExecutorContextBuilder builder = getCachedContextBuilder(type);
         return builder.build(logicSQL, dataSource.getConnection());
     }
     
-    private TrafficExecutorContextBuilder<?> getCachedTrafficExecutorContextBuilder(final String type) {
+    private TrafficExecutorContextBuilder<?> getCachedContextBuilder(final String type) {
         TrafficExecutorContextBuilder<?> result;
         if (null == (result = TYPE_CONTEXT_BUILDERS.get(type))) {
             result = TYPE_CONTEXT_BUILDERS.computeIfAbsent(type, key -> TypedSPIRegistry.getRegisteredService(TrafficExecutorContextBuilder.class, key, new Properties()));
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
index f80c3ce..2c6e3ad 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/rule/TrafficRule.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.traffic.rule;
 
 import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.binder.LogicSQL;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
 import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.sql.parser.sql.common.segment.generic.CommentSegment;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.AbstractSQLStatement;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import org.apache.shardingsphere.traffic.api.config.TrafficRuleConfiguration;
 import org.apache.shardingsphere.traffic.api.traffic.hint.HintTrafficAlgorithm;
 import org.apache.shardingsphere.traffic.api.traffic.hint.HintTrafficValue;
@@ -76,7 +76,7 @@ public final class TrafficRule implements GlobalRule {
         for (TrafficStrategyRule each : trafficStrategyRules) {
             TrafficAlgorithm trafficAlgorithm = trafficAlgorithms.get(each.getAlgorithmName());
             Preconditions.checkState(null != trafficAlgorithm, "Traffic strategy rule configuration must match traffic algorithm.");
-            if (match(trafficAlgorithm, logicSQL.getSqlStatementContext())) {
+            if (match(trafficAlgorithm, logicSQL)) {
                 return Optional.of(each);
             }
         }
@@ -84,10 +84,11 @@ public final class TrafficRule implements GlobalRule {
     }
     
     @SuppressWarnings("unchecked")
-    private boolean match(final TrafficAlgorithm trafficAlgorithm, final SQLStatementContext<?> statementContext) {
+    private boolean match(final TrafficAlgorithm trafficAlgorithm, final LogicSQL logicSQL) {
+        SQLStatement sqlStatement = logicSQL.getSqlStatementContext().getSqlStatement();
         if (trafficAlgorithm instanceof HintTrafficAlgorithm) {
             HintTrafficAlgorithm<Comparable<?>> hintTrafficAlgorithm = (HintTrafficAlgorithm<Comparable<?>>) trafficAlgorithm;
-            for (HintTrafficValue<Comparable<?>> each : getHintTrafficValues(statementContext)) {
+            for (HintTrafficValue<Comparable<?>> each : getHintTrafficValues(sqlStatement)) {
                 if (hintTrafficAlgorithm.match(each)) {
                     return true;
                 }
@@ -95,16 +96,16 @@ public final class TrafficRule implements GlobalRule {
         }
         if (trafficAlgorithm instanceof SegmentTrafficAlgorithm) {
             SegmentTrafficAlgorithm segmentTrafficAlgorithm = (SegmentTrafficAlgorithm) trafficAlgorithm;
-            SegmentTrafficValue segmentTrafficValue = new SegmentTrafficValue(statementContext.getSqlStatement());
+            SegmentTrafficValue segmentTrafficValue = new SegmentTrafficValue(sqlStatement, logicSQL.getSql());
             return segmentTrafficAlgorithm.match(segmentTrafficValue);
         }
         return false;
     }
     
-    private Collection<HintTrafficValue<Comparable<?>>> getHintTrafficValues(final SQLStatementContext<?> statementContext) {
+    private Collection<HintTrafficValue<Comparable<?>>> getHintTrafficValues(final SQLStatement sqlStatement) {
         Collection<HintTrafficValue<Comparable<?>>> result = new LinkedList<>();
-        if (statementContext.getSqlStatement() instanceof AbstractSQLStatement) {
-            for (CommentSegment each : ((AbstractSQLStatement) statementContext.getSqlStatement()).getCommentSegments()) {
+        if (sqlStatement instanceof AbstractSQLStatement) {
+            for (CommentSegment each : ((AbstractSQLStatement) sqlStatement).getCommentSegments()) {
                 result.add(new HintTrafficValue<>(each.getText()));
             }
         }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
similarity index 56%
copy from shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
copy to shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
index c5cb4fa..fcf37f3 100644
--- a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/main/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RandomTrafficLoadBalanceAlgorithmTest.java
@@ -17,23 +17,22 @@
 
 package org.apache.shardingsphere.traffic.algorithm.loadbalance;
 
-import org.apache.shardingsphere.traffic.spi.TrafficLoadBalanceAlgorithm;
+import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 
-/**
- * Random traffic load balance algorithm.
- */
-public final class RandomTrafficLoadBalanceAlgorithm implements TrafficLoadBalanceAlgorithm {
+import static org.junit.Assert.assertTrue;
+
+public final class RandomTrafficLoadBalanceAlgorithmTest {
     
-    @Override
-    public String getDataSourceName(final List<String> dataSourceNames) {
-        return dataSourceNames.get(ThreadLocalRandom.current().nextInt(dataSourceNames.size()));
-    }
+    private final RandomTrafficLoadBalanceAlgorithm randomAlgorithm = new RandomTrafficLoadBalanceAlgorithm();
     
-    @Override
-    public String getType() {
-        return "RANDOM";
+    @Test
+    public void assertGetInstanceId() {
+        List<String> instanceIds = Arrays.asList("127.0.0.1@3307", "127.0.0.1@3308");
+        assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic", instanceIds)));
+        assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic", instanceIds)));
+        assertTrue(instanceIds.contains(randomAlgorithm.getInstanceId("simple_traffic", instanceIds)));
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java
new file mode 100644
index 0000000..b075e69
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-traffic/shardingsphere-traffic-core/src/test/java/org/apache/shardingsphere/traffic/algorithm/loadbalance/RoundRobinTrafficLoadBalanceAlgorithmTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.traffic.algorithm.loadbalance;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class RoundRobinTrafficLoadBalanceAlgorithmTest {
+    
+    private final RoundRobinTrafficLoadBalanceAlgorithm roundRobinAlgorithm = new RoundRobinTrafficLoadBalanceAlgorithm();
+    
+    @Before
+    @After
+    public void reset() throws NoSuchFieldException, IllegalAccessException {
+        Field field = RoundRobinTrafficLoadBalanceAlgorithm.class.getDeclaredField("COUNTS");
+        field.setAccessible(true);
+        ((ConcurrentHashMap<?, ?>) field.get(RoundRobinTrafficLoadBalanceAlgorithm.class)).clear();
+    }
+    
+    @Test
+    public void assertGetInstanceId() {
+        String instanceId1 = "127.0.0.1@3307";
+        String instanceId2 = "127.0.0.1@3308";
+        List<String> instanceIds = Arrays.asList(instanceId1, instanceId2);
+        assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic", instanceIds), is(instanceId1));
+        assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic", instanceIds), is(instanceId2));
+        assertThat(roundRobinAlgorithm.getInstanceId("simple_traffic", instanceIds), is(instanceId1));
+    }
+}