You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/21 08:46:28 UTC

[shardingsphere] branch master updated: add sharing conditions in update or delete sql (#6150)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4762a94  add sharing conditions in update or delete sql (#6150)
4762a94 is described below

commit 4762a94fd3ada8f64b37367b64015aa3c548b83b
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Sun Jun 21 16:46:04 2020 +0800

    add sharing conditions in update or delete sql (#6150)
    
    * add sharing conditions in update or delete sql
    
    * we don't need all select privilege on *.*, so remove it.
    
    Co-authored-by: Lucas <qi...@jd.com>
---
 .../scaling/core/config/RdbmsConfiguration.java    |  3 ++
 .../core/config/utils/SyncConfigurationUtil.java   | 43 +++++++++++++++++++---
 .../executor/importer/AbstractJDBCImporter.java    | 13 ++++---
 .../executor/importer/AbstractSqlBuilder.java      | 34 ++++++++---------
 .../core/execute/executor/record/RecordUtil.java   | 18 +++++++++
 .../job/preparer/ShardingScalingJobPreparer.java   |  2 +-
 .../splitter/InventoryDataTaskSplitter.java        |  2 +-
 .../importer/AbstractJDBCImporterTest.java         | 23 ++++++++++--
 .../executor/importer/AbstractSqlBuilderTest.java  | 33 ++++++++++++++---
 .../scaling/mysql/MySQLDataSourceChecker.java      |  4 +-
 .../scaling/mysql/MySQLDataSourceCheckerTest.java  |  4 +-
 11 files changed, 135 insertions(+), 44 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index 6933eb1..675c1e8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -23,6 +23,7 @@ import lombok.Setter;
 import lombok.SneakyThrows;
 
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Relational database management system configuration.
@@ -36,6 +37,8 @@ public final class RdbmsConfiguration implements Cloneable {
     
     private String tableName;
     
+    private Map<String, Set<String>> shardingColumnsMap;
+    
     private String whereCondition;
     
     private int spiltNum;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
index 745b471..3f7c5e3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
@@ -17,21 +17,29 @@
 
 package org.apache.shardingsphere.scaling.core.config.utils;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
+import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingTableRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.ComplexShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.ShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.api.config.strategy.StandardShardingStrategyConfiguration;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Sync configuration Util.
@@ -52,7 +60,7 @@ public final class SyncConfigurationUtil {
         Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
         for (String each : dataSourceTableNameMap.keySet()) {
             RdbmsConfiguration dumperConfiguration = createDumperConfiguration(sourceDatasource.get(each));
-            RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration);
+            RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
             Map<String, String> tableNameMap = dataSourceTableNameMap.get(each);
             result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), tableNameMap, dumperConfiguration, importerConfiguration));
         }
@@ -110,13 +118,36 @@ public final class SyncConfigurationUtil {
         return result;
     }
     
-    private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration) {
+    private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) {
         RdbmsConfiguration result = new RdbmsConfiguration();
         JDBCDataSourceConfiguration importerDataSourceConfiguration = new JDBCDataSourceConfiguration(
                 scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUrl(),
                 scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUsername(),
                 scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getPassword());
         result.setDataSourceConfiguration(importerDataSourceConfiguration);
+        result.setShardingColumnsMap(toShardingColumnsMap(shardingRuleConfig));
         return result;
     }
+    
+    private static Map<String, Set<String>> toShardingColumnsMap(final ShardingRuleConfiguration shardingRuleConfig) {
+        Map<String, Set<String>> result = Maps.newConcurrentMap();
+        for (ShardingTableRuleConfiguration each : shardingRuleConfig.getTables()) {
+            Set<String> shardingColumns = Sets.newHashSet();
+            shardingColumns.addAll(extractShardingColumns(each.getDatabaseShardingStrategy()));
+            shardingColumns.addAll(extractShardingColumns(each.getTableShardingStrategy()));
+            result.put(each.getLogicTable(), shardingColumns);
+        }
+        return result;
+    }
+    
+    private static Set<String> extractShardingColumns(final ShardingStrategyConfiguration shardingStrategy) {
+        if (shardingStrategy instanceof StandardShardingStrategyConfiguration) {
+            return Sets.newHashSet(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn());
+        }
+        if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
+            return Sets.newHashSet(((ComplexShardingStrategyConfiguration) shardingStrategy).getShardingColumns().split(","));
+        }
+        return Collections.EMPTY_SET;
+    }
+    
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index d688785..5cbfbb7 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -131,10 +131,11 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     }
     
     private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
+        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
         List<Column> values = new ArrayList<>();
         values.addAll(RecordUtil.extractUpdatedColumns(record));
-        values.addAll(RecordUtil.extractPrimaryColumns(record));
-        String updateSql = sqlBuilder.buildUpdateSQL(record);
+        values.addAll(conditionColumns);
+        String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
         PreparedStatement ps = connection.prepareStatement(updateSql);
         for (int i = 0; i < values.size(); i++) {
             ps.setObject(i + 1, values.get(i).getValue());
@@ -143,11 +144,11 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     }
     
     private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
-        String deleteSql = sqlBuilder.buildDeleteSQL(record);
-        List<Column> primaryKeys = RecordUtil.extractPrimaryColumns(record);
+        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
+        String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
         PreparedStatement ps = connection.prepareStatement(deleteSql);
-        for (int i = 0; i < primaryKeys.size(); i++) {
-            ps.setObject(i + 1, primaryKeys.get(i).getValue());
+        for (int i = 0; i < conditionColumns.size(); i++) {
+            ps.setObject(i + 1, conditionColumns.get(i).getValue());
         }
         ps.execute();
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
index e1d858b..391ab09 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilder.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 import com.google.common.collect.Collections2;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 
 import java.util.Collection;
 import java.util.List;
@@ -84,12 +83,13 @@ public abstract class AbstractSqlBuilder {
      * Build update SQL.
      *
      * @param dataRecord data record
+     * @param conditionColumns condition columns
      * @return update SQL
      */
-    public String buildUpdateSQL(final DataRecord dataRecord) {
+    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), RecordUtil.extractPrimaryColumns(dataRecord)));
+            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         StringBuilder updatedColumnString = new StringBuilder();
         for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
@@ -99,13 +99,8 @@ public abstract class AbstractSqlBuilder {
         return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString.toString());
     }
     
-    private String buildUpdateSQLInternal(final String tableName, final Collection<Column> extractPrimaryColumns) {
-        StringBuilder where = new StringBuilder();
-        for (Column each : extractPrimaryColumns) {
-            where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
-        }
-        where.setLength(where.length() - 1);
-        return String.format("UPDATE %s%s%s SET %%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
+    private String buildUpdateSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+        return String.format("UPDATE %s%s%s SET %%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), buildWhereSQL(conditionColumns));
     }
     
     private Collection<Column> extractUpdatedColumns(final Collection<Column> columns) {
@@ -116,22 +111,27 @@ public abstract class AbstractSqlBuilder {
      * Build delete SQL.
      *
      * @param dataRecord data record
+     * @param conditionColumns condition columns
      * @return delete SQL
      */
-    public String buildDeleteSQL(final DataRecord dataRecord) {
+    public String buildDeleteSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), RecordUtil.extractPrimaryColumns(dataRecord)));
+            sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         return sqlCacheMap.get(sqlCacheKey);
     }
     
-    private String buildDeleteSQLInternal(final String tableName, final Collection<Column> primaryColumns) {
+    private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+        return String.format("DELETE FROM %s%s%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), buildWhereSQL(conditionColumns));
+    }
+    
+    private String buildWhereSQL(final Collection<Column> conditionColumns) {
         StringBuilder where = new StringBuilder();
-        for (Column each : primaryColumns) {
-            where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
+        for (Column each : conditionColumns) {
+            where.append(String.format("%s%s%s = ? and ", getLeftIdentifierQuoteString(), each.getName(), getRightIdentifierQuoteString()));
         }
-        where.setLength(where.length() - 1);
-        return String.format("DELETE FROM %s%s%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
+        where.setLength(where.length() - 5);
+        return where.toString();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
index 7f9ac78..9c39c7b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Record util.
@@ -42,6 +43,23 @@ public final class RecordUtil {
     }
     
     /**
+     * Extract condition columns(include primary and sharding columns) from data record.
+     *
+     * @param dataRecord data record
+     * @param shardingColumns sharding columns
+     * @return condition columns
+     */
+    public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
+        List<Column> result = new ArrayList<>();
+        for (Column each : dataRecord.getColumns()) {
+            if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+                result.add(each);
+            }
+        }
+        return result;
+    }
+    
+    /**
      * Extract updated columns from data record.
      *
      * @param dataRecord data record
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 88704ff..fd0f55a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -58,8 +58,8 @@ public final class ShardingScalingJobPreparer {
         String databaseType = shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration().getDatabaseType().getName();
         try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigurations())) {
             checkDatasources(databaseType, dataSourceManager);
-            splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
             initIncrementalDataTasks(databaseType, shardingScalingJob, dataSourceManager);
+            splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
         } catch (PrepareFailedException ex) {
             log.warn("Preparing sharding scaling job {} : {} failed", shardingScalingJob.getJobId(), shardingScalingJob.getJobName(), ex);
             shardingScalingJob.setStatus(SyncTaskControlStatus.PREPARING_FAILURE.name());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 3232338..883af8f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -123,7 +123,7 @@ public final class InventoryDataTaskSplitter {
             long min = rs.getLong(1);
             long max = rs.getLong(2);
             long step = (max - min) / concurrency;
-            for (int i = 0; i < concurrency; i++) {
+            for (int i = 0; i < concurrency && min <= max; i++) {
                 RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
                 if (i < concurrency - 1) {
                     splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, min + step));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 54c721f..5355fc8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -17,11 +17,14 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
@@ -35,8 +38,11 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import javax.sql.DataSource;
 
 import static org.mockito.Mockito.verify;
@@ -49,9 +55,9 @@ public final class AbstractJDBCImporterTest {
     
     private static final String INSERT_SQL = "INSERT INTO test_table (id,user,status) VALUES(?,?,?)";
     
-    private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ?";
+    private static final String DELETE_SQL = "DELETE FROM test_table WHERE id = ? and user = ?";
     
-    private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ?";
+    private static final String UPDATE_SQL = "UPDATE test_table SET user = ?,status = ? WHERE id = ? and user = ?";
     
     @Mock
     private DataSourceManager dataSourceManager;
@@ -107,27 +113,33 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = getDataRecord("DELETE");
-        when(sqlBuilder.buildDeleteSQL(deleteRecord)).thenReturn(DELETE_SQL);
+        when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
         when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
+        verify(preparedStatement).setObject(2, 10);
         verify(preparedStatement).execute();
     }
     
     @Test
     public void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = getDataRecord("UPDATE");
-        when(sqlBuilder.buildUpdateSQL(updateRecord)).thenReturn(UPDATE_SQL);
+        when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 10);
         verify(preparedStatement).setObject(2, "UPDATE");
         verify(preparedStatement).setObject(3, 1);
+        verify(preparedStatement).setObject(4, 10);
         verify(preparedStatement).execute();
     }
     
+    private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+        return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("user"));
+    }
+    
     private List<Record> mockRecords(final DataRecord dataRecord) {
         List<Record> result = new LinkedList<>();
         result.add(dataRecord);
@@ -149,6 +161,9 @@ public final class AbstractJDBCImporterTest {
         RdbmsConfiguration result = new RdbmsConfiguration();
         result.setTableName(TABLE_NAME);
         result.setDataSourceConfiguration(dataSourceConfiguration);
+        Map<String, Set<String>> shardingColumnsMap = Maps.newHashMap();
+        shardingColumnsMap.put("test_table", Sets.newHashSet("user"));
+        result.setShardingColumnsMap(shardingColumnsMap);
         return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 875df99..22f9ea8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -17,13 +17,17 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
+import com.google.common.collect.Sets;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collection;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
@@ -51,25 +55,44 @@ public class AbstractSqlBuilderTest {
     @Test
     public void assertBuildInsertSql() {
         String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
-        assertThat(actual, is("INSERT INTO `t1`(`id`,`c1`,`c2`,`c3`) VALUES(?,?,?,?)"));
+        assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
     }
     
     @Test
-    public void assertBuildUpdateSql() {
-        String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"));
+    public void assertBuildUpdateSqlWithPrimaryKey() {
+        String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
         assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
     }
     
     @Test
-    public void assertBuildDeleteSql() {
-        String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"));
+    public void assertBuildUpdateSqlWithShardingColumns() {
+        DataRecord dataRecord = mockDataRecord("t2");
+        String actual = sqlBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+        assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
+    }
+    
+    @Test
+    public void assertBuildDeleteSqlWithPrimaryKey() {
+        String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"), RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
         assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
     }
     
+    @Test
+    public void assertBuildDeleteSqlWithConditionColumns() {
+        DataRecord dataRecord = mockDataRecord("t3");
+        String actual = sqlBuilder.buildDeleteSQL(dataRecord, mockConditionColumns(dataRecord));
+        assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
+    }
+    
+    private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+        return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("sc"));
+    }
+    
     private DataRecord mockDataRecord(final String tableName) {
         DataRecord result = new DataRecord(new NopLogPosition(), 4);
         result.setTableName(tableName);
         result.addColumn(new Column("id", "", false, true));
+        result.addColumn(new Column("sc", "", false, false));
         result.addColumn(new Column("c1", "", true, false));
         result.addColumn(new Column("c2", "", true, false));
         result.addColumn(new Column("c3", "", true, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
index 8bac86f..684937d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceChecker.java
@@ -37,7 +37,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
     
     private static final String SHOW_GRANTS_SQL = "SHOW GRANTS";
     
-    private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
+    private static final String[][] REQUIRED_PRIVILEGES = {{"ALL PRIVILEGES", "ON *.*"}, {"REPLICATION SLAVE", "REPLICATION CLIENT", "ON *.*"}};
     
     private static final String SHOW_VARIABLES_SQL = "SHOW VARIABLES LIKE '%s'";
     
@@ -68,7 +68,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
         } catch (SQLException e) {
             throw new PrepareFailedException("Source datasource check privileges failed.");
         }
-        throw new PrepareFailedException("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
+        throw new PrepareFailedException("Source datasource is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
     }
     
     private boolean matchPrivileges(final String privilege) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
index cb2835a..f4f58f7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLDataSourceCheckerTest.java
@@ -70,7 +70,7 @@ public class MySQLDataSourceCheckerTest {
     @Test
     public void assertCheckPrivilegeWithParticularSuccess() throws SQLException {
         when(resultSet.next()).thenReturn(true);
-        when(resultSet.getString(1)).thenReturn("GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
+        when(resultSet.getString(1)).thenReturn("GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '%'@'%'");
         dataSourceChecker.checkPrivilege(dataSources);
         verify(preparedStatement, Mockito.times(1)).executeQuery();
     }
@@ -89,7 +89,7 @@ public class MySQLDataSourceCheckerTest {
         try {
             dataSourceChecker.checkPrivilege(dataSources);
         } catch (PrepareFailedException checkFailedEx) {
-            assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
+            assertThat(checkFailedEx.getMessage(), is("Source datasource is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges."));
         }
     }