You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/11 06:20:18 UTC

[shardingsphere] branch master updated: For optimization scaling importer performance (#8127)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f3153  For optimization scaling importer performance (#8127)
e6f3153 is described below

commit e6f3153f45a53e74daa82a00b5ac8d81e19a8986
Author: avalon5666 <64...@users.noreply.github.com>
AuthorDate: Wed Nov 11 14:08:20 2020 +0800

    For optimization scaling importer performance (#8127)
    
    * Revert "Use insert on duplicate key update in mysql insert (#8004)"
    
    This reverts commit 7a2b03c48d0b8c5d967df1d43607484ecd49a6e2.
    
    * Use insert on duplicate key update in mysql insert (#8004)
    
    * Support update primary key column
---
 .../executor/importer/AbstractJDBCImporter.java    | 44 ++++++++----
 .../executor/importer/AbstractSQLBuilder.java      | 82 +++++++++-------------
 .../execute/executor/importer/PreparedSQL.java     | 39 ----------
 .../core/execute/executor/record/Column.java       |  9 +++
 .../core/execute/executor/record/DataRecord.java   |  6 +-
 .../core/execute/executor/record/RecordUtil.java   | 55 ++++++++-------
 .../importer/AbstractJDBCImporterTest.java         | 47 +++++++++++--
 .../executor/importer/AbstractSqlBuilderTest.java  | 74 +++++--------------
 .../fixture/FixtureDataConsistencyChecker.java     |  3 +-
 .../scaling/mysql/MySQLBinlogDumper.java           |  5 +-
 .../scaling/mysql/MySQLDataConsistencyChecker.java |  3 +-
 .../scaling/mysql/MySQLImporter.java               |  7 +-
 .../scaling/mysql/MySQLSQLBuilder.java             | 27 +++----
 .../scaling/mysql/MySQLImporterTest.java           |  8 +--
 ...qlBuilderTest.java => MySQLSQLBuilderTest.java} | 29 ++++----
 .../PostgreSQLDataConsistencyChecker.java          |  3 +-
 .../scaling/postgresql/PostgreSQLImporter.java     |  7 +-
 .../scaling/postgresql/PostgreSQLSQLBuilder.java   | 20 ++----
 .../scaling/postgresql/PostgreSQLImporterTest.java |  8 +--
 .../postgresql/PostgreSQLSqlBuilderTest.java       |  8 +--
 20 files changed, 210 insertions(+), 274 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index e36eabc..3ca1ba7 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -25,9 +25,11 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
 import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
 
 import javax.sql.DataSource;
@@ -37,8 +39,6 @@ import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * Abstract JDBC importer implementation.
@@ -58,7 +58,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     protected AbstractJDBCImporter(final ImporterConfiguration importerConfig, final DataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
-        sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+        sqlBuilder = createSQLBuilder();
     }
     
     /**
@@ -66,7 +66,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
      *
      * @return SQL builder
      */
-    protected abstract AbstractSQLBuilder createSQLBuilder(Map<String, Set<String>> shardingColumnsMap);
+    protected abstract AbstractSQLBuilder createSQLBuilder();
     
     @Override
     public final void start() {
@@ -140,25 +140,41 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     }
     
     private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
+        String insertSql = sqlBuilder.buildInsertSQL(record);
+        PreparedStatement ps = connection.prepareStatement(insertSql);
+        ps.setQueryTimeout(30);
         try {
-            executeSQL(connection, record, sqlBuilder.buildInsertSQL(record));
+            for (int i = 0; i < record.getColumnCount(); i++) {
+                ps.setObject(i + 1, record.getColumn(i).getValue());
+            }
+            ps.execute();
         } catch (final SQLIntegrityConstraintViolationException ignored) {
         }
     }
     
     private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
-        executeSQL(connection, record, sqlBuilder.buildUpdateSQL(record));
+        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
+        List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
+        String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
+        PreparedStatement ps = connection.prepareStatement(updateSql);
+        for (int i = 0; i < updatedColumns.size(); i++) {
+            ps.setObject(i + 1, updatedColumns.get(i).getValue());
+        }
+        for (int i = 0; i < conditionColumns.size(); i++) {
+            Column keyColumn = conditionColumns.get(i);
+            ps.setObject(updatedColumns.size() + i + 1,
+                    // sharding column can not be updated
+                    (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
+        }
+        ps.execute();
     }
     
     private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
-        executeSQL(connection, record, sqlBuilder.buildDeleteSQL(record));
-    }
-    
-    private void executeSQL(final Connection connection, final DataRecord record, final PreparedSQL preparedSQL) throws SQLException {
-        PreparedStatement ps = connection.prepareStatement(preparedSQL.getSql());
-        for (int i = 0; i < preparedSQL.getValuesIndex().size(); i++) {
-            int columnIndex = preparedSQL.getValuesIndex().get(i);
-            ps.setObject(i + 1, record.getColumn(columnIndex).getValue());
+        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
+        String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
+        PreparedStatement ps = connection.prepareStatement(deleteSql);
+        for (int i = 0; i < conditionColumns.size(); i++) {
+            ps.setObject(i + 1, conditionColumns.get(i).getValue());
         }
         ps.execute();
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
index fc191df..e102b05 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
@@ -17,21 +17,18 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
-import lombok.RequiredArgsConstructor;
+import com.google.common.collect.Collections2;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
  * Abstract SQL builder.
  */
-@RequiredArgsConstructor
 public abstract class AbstractSQLBuilder {
     
     private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -40,9 +37,7 @@ public abstract class AbstractSQLBuilder {
     
     private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
     
-    private final Map<String, Set<String>> shardingColumnsMap;
-    
-    private final ConcurrentMap<String, PreparedSQL> sqlCacheMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
     
     /**
      * Get left identifier quote string.
@@ -72,90 +67,79 @@ public abstract class AbstractSQLBuilder {
      * Build insert SQL.
      *
      * @param dataRecord data record
-     * @return insert prepared SQL
+     * @return insert SQL
      */
-    public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
+    public String buildInsertSQL(final DataRecord dataRecord) {
         String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord));
+            sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
         }
         return sqlCacheMap.get(sqlCacheKey);
     }
     
-    protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
+    private String buildInsertSQLInternal(final String tableName, final List<Column> columns) {
         StringBuilder columnsLiteral = new StringBuilder();
         StringBuilder holder = new StringBuilder();
-        List<Integer> valuesIndex = new ArrayList<>();
-        for (int i = 0; i < dataRecord.getColumnCount(); i++) {
-            columnsLiteral.append(String.format("%s,", quote(dataRecord.getColumn(i).getName())));
+        for (Column each : columns) {
+            columnsLiteral.append(String.format("%s,", quote(each.getName())));
             holder.append("?,");
-            valuesIndex.add(i);
         }
         columnsLiteral.setLength(columnsLiteral.length() - 1);
         holder.setLength(holder.length() - 1);
-        return new PreparedSQL(
-                String.format("INSERT INTO %s(%s) VALUES(%s)", quote(dataRecord.getTableName()), columnsLiteral, holder),
-                valuesIndex);
+        return String.format("INSERT INTO %s(%s) VALUES(%s)", quote(tableName), columnsLiteral, holder);
     }
     
     /**
      * Build update SQL.
      *
      * @param dataRecord data record
-     * @return update prepared SQL
+     * @param conditionColumns condition columns
+     * @return update SQL
      */
-    public PreparedSQL buildUpdateSQL(final DataRecord dataRecord) {
+    public String buildUpdateSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord));
+            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         StringBuilder updatedColumnString = new StringBuilder();
-        List<Integer> valuesIndex = new ArrayList<>();
-        for (Integer each : RecordUtil.extractUpdatedColumns(dataRecord)) {
-            updatedColumnString.append(String.format("%s = ?,", quote(dataRecord.getColumn(each).getName())));
-            valuesIndex.add(each);
+        for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
+            updatedColumnString.append(String.format("%s = ?,", quote(each.getName())));
         }
         updatedColumnString.setLength(updatedColumnString.length() - 1);
-        PreparedSQL preparedSQL = sqlCacheMap.get(sqlCacheKey);
-        valuesIndex.addAll(preparedSQL.getValuesIndex());
-        return new PreparedSQL(
-                String.format(preparedSQL.getSql(), updatedColumnString),
-                valuesIndex);
+        return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString);
+    }
+    
+    private String buildUpdateSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+        return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
     }
     
-    private PreparedSQL buildUpdateSQLInternal(final DataRecord dataRecord) {
-        List<Integer> valuesIndex = new ArrayList<>();
-        return new PreparedSQL(
-                String.format("UPDATE %s SET %%s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, valuesIndex)),
-                valuesIndex);
+    private Collection<Column> extractUpdatedColumns(final Collection<Column> columns) {
+        return Collections2.filter(columns, Column::isUpdated);
     }
     
     /**
      * Build delete SQL.
      *
      * @param dataRecord data record
-     * @return delete prepared SQL
+     * @param conditionColumns condition columns
+     * @return delete SQL
      */
-    public PreparedSQL buildDeleteSQL(final DataRecord dataRecord) {
+    public String buildDeleteSQL(final DataRecord dataRecord, final Collection<Column> conditionColumns) {
         String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + dataRecord.getTableName();
         if (!sqlCacheMap.containsKey(sqlCacheKey)) {
-            sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord));
+            sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
         }
         return sqlCacheMap.get(sqlCacheKey);
     }
     
-    private PreparedSQL buildDeleteSQLInternal(final DataRecord dataRecord) {
-        List<Integer> columnsIndex = new ArrayList<>();
-        return new PreparedSQL(
-                String.format("DELETE FROM %s WHERE %s", quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, columnsIndex)),
-                columnsIndex);
+    private String buildDeleteSQLInternal(final String tableName, final Collection<Column> conditionColumns) {
+        return String.format("DELETE FROM %s WHERE %s", quote(tableName), buildWhereSQL(conditionColumns));
     }
     
-    private String buildWhereSQL(final DataRecord dataRecord, final List<Integer> valuesIndex) {
+    private String buildWhereSQL(final Collection<Column> conditionColumns) {
         StringBuilder where = new StringBuilder();
-        for (Integer each : RecordUtil.extractConditionColumns(dataRecord, shardingColumnsMap.get(dataRecord.getTableName()))) {
-            where.append(String.format("%s = ? and ", quote(dataRecord.getColumn(each).getName())));
-            valuesIndex.add(each);
+        for (Column each : conditionColumns) {
+            where.append(String.format("%s = ? and ", quote(each.getName())));
         }
         where.setLength(where.length() - 5);
         return where.toString();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
deleted file mode 100644
index fe07e10..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-
-import lombok.Getter;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Prepared SQL, include complete sql and complete values index list.
- */
-@Getter
-public class PreparedSQL {
-    
-    private final String sql;
-    
-    private final List<Integer> valuesIndex;
-    
-    public PreparedSQL(final String sql, final List<Integer> valuesIndex) {
-        this.sql = sql;
-        this.valuesIndex = Collections.unmodifiableList(valuesIndex);
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
index 89d80b7..1c66a9f 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Column.java
@@ -29,12 +29,21 @@ public final class Column {
     
     private final String name;
     
+    /**
+     * Value are available only when the primary key column is updated.
+     */
+    private final Object oldValue;
+    
     private final Object value;
     
     private final boolean updated;
     
     private final boolean primaryKey;
     
+    public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
+        this(name, null, value, updated, primaryKey);
+    }
+    
     @Override
     public String toString() {
         return String.format("%s=%s", name, value);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index ffb88cd..e8594c0 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -31,6 +31,8 @@ import java.util.List;
 /**
  * Data record.
  */
+@Setter
+@Getter
 @EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
 @ToString
 public final class DataRecord extends Record {
@@ -39,12 +41,8 @@ public final class DataRecord extends Record {
     
     private final List<Object> primaryKeyValue = new LinkedList<>();
     
-    @Setter
-    @Getter
     private String type;
     
-    @Setter
-    @Getter
     private String tableName;
     
     public DataRecord(final Position position, final int columnCount) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
index 0a72b08..d9e38bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
@@ -20,10 +20,9 @@ package org.apache.shardingsphere.scaling.core.execute.executor.record;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Record utility.
@@ -32,45 +31,51 @@ import java.util.stream.IntStream;
 public final class RecordUtil {
     
     /**
-     * Extract primary columns index from data record.
+     * Extract primary columns from data record.
      *
      * @param dataRecord data record
-     * @return primary columns index
+     * @return primary columns
      */
-    public static List<Integer> extractPrimaryColumns(final DataRecord dataRecord) {
-        return IntStream.range(0, dataRecord.getColumnCount())
-                .filter(each -> dataRecord.getColumn(each).isPrimaryKey())
-                .mapToObj(each -> each)
-                .collect(Collectors.toList());
+    public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
+        List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+        for (Column each : dataRecord.getColumns()) {
+            if (each.isPrimaryKey()) {
+                result.add(each);
+            }
+        }
+        return result;
     }
     
     /**
-     * Extract condition columns(include primary and sharding columns) index from data record.
+     * Extract condition columns(include primary and sharding columns) from data record.
      *
      * @param dataRecord data record
      * @param shardingColumns sharding columns
-     * @return condition columns index
+     * @return condition columns
      */
-    public static List<Integer> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
-        return IntStream.range(0, dataRecord.getColumnCount())
-                .filter(each -> {
-                    Column column = dataRecord.getColumn(each);
-                    return column.isPrimaryKey() || shardingColumns.contains(column.getName());
-                })
-                .mapToObj(each -> each)
-                .collect(Collectors.toList());
+    public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
+        List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+        for (Column each : dataRecord.getColumns()) {
+            if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+                result.add(each);
+            }
+        }
+        return result;
     }
     
     /**
      * Extract updated columns from data record.
      *
      * @param dataRecord data record
-     * @return updated columns index
+     * @return updated columns
      */
-    public static List<Integer> extractUpdatedColumns(final DataRecord dataRecord) {
-        return IntStream.range(0, dataRecord.getColumnCount())
-                .filter(each -> dataRecord.getColumn(each).isUpdated())
-                .mapToObj(each -> each)
-                .collect(Collectors.toList());
+    public static List<Column> extractUpdatedColumns(final DataRecord dataRecord) {
+        List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
+        for (Column each : dataRecord.getColumns()) {
+            if (each.isUpdated()) {
+                result.add(each);
+            }
+        }
+        return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 3283dd0..637131d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -17,21 +17,22 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -39,11 +40,13 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -86,7 +89,7 @@ public final class AbstractJDBCImporterTest {
         jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(), dataSourceManager) {
             
             @Override
-            protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+            protected AbstractSQLBuilder createSQLBuilder() {
                 return sqlBuilder;
             }
         };
@@ -96,9 +99,9 @@ public final class AbstractJDBCImporterTest {
     }
     
     @Test
-    public void assertInsertDataRecord() throws SQLException {
+    public void assertWriteInsertDataRecord() throws SQLException {
         DataRecord insertRecord = getDataRecord("INSERT");
-        when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(new PreparedSQL(INSERT_SQL, Lists.newArrayList(0, 1, 2)));
+        when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
         when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(insertRecord));
         jdbcImporter.run();
@@ -111,7 +114,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertDeleteDataRecord() throws SQLException {
         DataRecord deleteRecord = getDataRecord("DELETE");
-        when(sqlBuilder.buildDeleteSQL(deleteRecord)).thenReturn(new PreparedSQL(DELETE_SQL, Lists.newArrayList(0, 1)));
+        when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
         when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
         jdbcImporter.run();
@@ -123,7 +126,7 @@ public final class AbstractJDBCImporterTest {
     @Test
     public void assertUpdateDataRecord() throws SQLException {
         DataRecord updateRecord = getDataRecord("UPDATE");
-        when(sqlBuilder.buildUpdateSQL(updateRecord)).thenReturn(new PreparedSQL(UPDATE_SQL, Lists.newArrayList(1, 2, 0, 1)));
+        when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
         when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
@@ -134,6 +137,36 @@ public final class AbstractJDBCImporterTest {
         verify(preparedStatement).execute();
     }
     
+    @Test
+    public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
+        DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
+        when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+        when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
+        when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+        jdbcImporter.run();
+        InOrder inOrder = inOrder(preparedStatement);
+        inOrder.verify(preparedStatement).setObject(1, 2);
+        inOrder.verify(preparedStatement).setObject(2, 10);
+        inOrder.verify(preparedStatement).setObject(3, "UPDATE");
+        inOrder.verify(preparedStatement).setObject(4, 1);
+        inOrder.verify(preparedStatement).setObject(5, 10);
+        inOrder.verify(preparedStatement).execute();
+    }
+    
+    private DataRecord getUpdatePrimaryKeyDataRecord() {
+        DataRecord result = new DataRecord(new NopPosition(), 3);
+        result.setTableName(TABLE_NAME);
+        result.setType("UPDATE");
+        result.addColumn(new Column("id", 1, 2, true, true));
+        result.addColumn(new Column("user", 10, true, false));
+        result.addColumn(new Column("status", "UPDATE", true, false));
+        return result;
+    }
+    
+    private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+        return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("user"));
+    }
+    
     private List<Record> mockRecords(final DataRecord dataRecord) {
         List<Record> result = new LinkedList<>();
         result.add(dataRecord);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 4cc12e4..c61a481 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -20,37 +20,29 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 import com.google.common.collect.Sets;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
-import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.Map;
+import java.util.Collection;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
 public class AbstractSqlBuilderTest {
     
-    @Mock
-    private Map shardingColumnsMap;
-    
     private AbstractSQLBuilder sqlBuilder;
     
     @Before
     public void setUp() {
-        sqlBuilder = new AbstractSQLBuilder(shardingColumnsMap) {
+        sqlBuilder = new AbstractSQLBuilder() {
             
             @Override
             protected String getLeftIdentifierQuoteString() {
                 return "`";
             }
-            
+    
             @Override
             protected String getRightIdentifierQuoteString() {
                 return "`";
@@ -60,66 +52,38 @@ public class AbstractSqlBuilderTest {
     
     @Test
     public void assertBuildInsertSQL() {
-        PreparedSQL actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
-        assertThat(actual.getSql(), is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 2, 3, 4));
+        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+        assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithPrimaryKey() {
-        when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet());
-        PreparedSQL actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"));
-        assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0));
+        String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"), RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+        assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ?"));
     }
     
     @Test
     public void assertBuildUpdateSQLWithShardingColumns() {
-        when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
         DataRecord dataRecord = mockDataRecord("t2");
-        PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
-        assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0, 1));
-    }
-    
-    @Test
-    public void assertBuildUpdateSQLWithShardingColumnsUseCache() {
-        when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
-        DataRecord dataRecord = mockDataRecord("t2");
-        PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
-        assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 3, 4, 0, 1));
-        actual = sqlBuilder.buildUpdateSQL(mockDataRecord2("t2"));
-        assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(2, 4, 0, 1));
-    }
-    
-    private DataRecord mockDataRecord2(final String tableName) {
-        DataRecord result = new DataRecord(new NopPosition(), 4);
-        result.setTableName(tableName);
-        result.addColumn(new Column("id", "", false, true));
-        result.addColumn(new Column("sc", "", false, false));
-        result.addColumn(new Column("c1", "", true, false));
-        result.addColumn(new Column("c2", "", false, false));
-        result.addColumn(new Column("c3", "", true, false));
-        return result;
+        String actual = sqlBuilder.buildUpdateSQL(dataRecord, mockConditionColumns(dataRecord));
+        assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ? WHERE `id` = ? and `sc` = ?"));
     }
     
     @Test
     public void assertBuildDeleteSQLWithPrimaryKey() {
-        when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet());
-        PreparedSQL actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"));
-        assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0));
+        String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"), RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
+        assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
     }
     
     @Test
-    public void assertBuildDeleteSQLWithShardingColumns() {
-        when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet("sc"));
+    public void assertBuildDeleteSQLWithConditionColumns() {
         DataRecord dataRecord = mockDataRecord("t3");
-        PreparedSQL actual = sqlBuilder.buildDeleteSQL(dataRecord);
-        assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+        String actual = sqlBuilder.buildDeleteSQL(dataRecord, mockConditionColumns(dataRecord));
+        assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
+    }
+    
+    private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
+        return RecordUtil.extractConditionColumns(dataRecord, Sets.newHashSet("sc"));
     }
     
     private DataRecord mockDataRecord(final String tableName) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
index 1c13d4e..f5f315e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.core.fixture;
 
-import com.google.common.collect.Maps;
 import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
@@ -45,7 +44,7 @@ public final class FixtureDataConsistencyChecker extends AbstractDataConsistency
     
     @Override
     protected AbstractSQLBuilder getSqlBuilder() {
-        return new AbstractSQLBuilder(Maps.newHashMap()) {
+        return new AbstractSQLBuilder() {
             @Override
             protected String getLeftIdentifierQuoteString() {
                 return "`";
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index b9eb885..1bb170c 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -138,7 +138,10 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
             for (int j = 0; j < beforeValues.length; j++) {
                 Object oldValue = beforeValues[j];
                 Object newValue = afterValues[j];
-                record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(), newValue, !Objects.equals(newValue, oldValue), tableMetaData.isPrimaryKey(j)));
+                boolean updated = !Objects.equals(newValue, oldValue);
+                record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(),
+                        (tableMetaData.isPrimaryKey(j) && updated) ? oldValue : null,
+                        newValue, updated, tableMetaData.isPrimaryKey(j)));
             }
             pushRecord(record);
         }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
index fb44038..41f9e61 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
-import com.google.common.collect.Maps;
 import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
@@ -99,6 +98,6 @@ public final class MySQLDataConsistencyChecker extends AbstractDataConsistencyCh
     
     @Override
     protected MySQLSQLBuilder getSqlBuilder() {
-        return new MySQLSQLBuilder(Maps.newHashMap());
+        return new MySQLSQLBuilder();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index 97a971a..a22f5ac 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -22,9 +22,6 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
 
-import java.util.Map;
-import java.util.Set;
-
 /**
  * MySQL importer.
  */
@@ -35,7 +32,7 @@ public final class MySQLImporter extends AbstractJDBCImporter {
     }
     
     @Override
-    protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        return new MySQLSQLBuilder(shardingColumnsMap);
+    protected AbstractSQLBuilder createSQLBuilder() {
+        return new MySQLSQLBuilder();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
index 5da75de..b7cd578 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
@@ -18,24 +18,14 @@
 package org.apache.shardingsphere.scaling.mysql;
 
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * MySQL SQL builder.
  */
 public final class MySQLSQLBuilder extends AbstractSQLBuilder {
     
-    public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     public String getLeftIdentifierQuoteString() {
         return "`";
@@ -47,19 +37,20 @@ public final class MySQLSQLBuilder extends AbstractSQLBuilder {
     }
     
     @Override
-    protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
-        PreparedSQL preparedSQL = super.buildInsertSQLInternal(dataRecord);
-        StringBuilder insertSQL = new StringBuilder(preparedSQL.getSql() + " ON DUPLICATE KEY UPDATE ");
-        List<Integer> valuesIndex = new ArrayList<>(preparedSQL.getValuesIndex());
+    public String buildInsertSQL(final DataRecord dataRecord) {
+        return super.buildInsertSQL(dataRecord) + buildDuplicateUpdateSQL(dataRecord);
+    }
+    
+    private String buildDuplicateUpdateSQL(final DataRecord dataRecord) {
+        StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
             if (!dataRecord.getColumn(i).isPrimaryKey()) {
-                insertSQL.append(quote(column.getName())).append("=?,");
-                valuesIndex.add(i);
+                result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
             }
         }
-        insertSQL.setLength(insertSQL.length() - 1);
-        return new PreparedSQL(insertSQL.toString(), valuesIndex);
+        result.setLength(result.length() - 1);
+        return result.toString();
     }
     
     /**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
index 8b89a63..b1e950d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
@@ -17,14 +17,11 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
-import com.google.common.collect.Maps;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -45,9 +42,8 @@ public final class MySQLImporterTest {
     @Test
     public void assertCreateSqlBuilder() {
         MySQLImporter mySQLImporter = new MySQLImporter(importerConfig, dataSourceManager);
-        PreparedSQL insertSQL = mySQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
-        assertThat(insertSQL.getSql(), is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=?"));
-        assertThat(insertSQL.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 1));
+        String insertSQL = mySQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`) VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=VALUES(`name`)"));
     }
     
     private DataRecord mockDataRecord() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
similarity index 57%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
index 12adaa5..e74fe7e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilderTest.java
@@ -17,32 +17,33 @@
 
 package org.apache.shardingsphere.scaling.mysql;
 
-import com.google.common.collect.Maps;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
+import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
-import org.hamcrest.Matchers;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-public final class MySQLSqlBuilderTest {
+public class MySQLSQLBuilderTest {
+    
+    private AbstractSQLBuilder sqlBuilder = new MySQLSQLBuilder();
     
     @Test
     public void assertBuildInsertSQL() {
-        PreparedSQL actual = new MySQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
-        assertThat(actual.getSql(), is("INSERT INTO `t_order`(`id`,`name`,`age`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE `name`=?,`age`=?"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1, 2, 1, 2));
+        String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+        assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE `sc`=VALUES(`sc`),`c1`=VALUES(`c1`),`c2`=VALUES(`c2`),`c3`=VALUES(`c3`)"));
     }
     
-    private DataRecord mockDataRecord() {
-        DataRecord result = new DataRecord(new BinlogPosition("", 1), 2);
-        result.setTableName("t_order");
-        result.addColumn(new Column("id", 1, true, true));
-        result.addColumn(new Column("name", "", true, false));
-        result.addColumn(new Column("age", 1, true, false));
+    private DataRecord mockDataRecord(final String tableName) {
+        DataRecord result = new DataRecord(new NopPosition(), 4);
+        result.setTableName(tableName);
+        result.addColumn(new Column("id", "", false, true));
+        result.addColumn(new Column("sc", "", false, false));
+        result.addColumn(new Column("c1", "", true, false));
+        result.addColumn(new Column("c2", "", true, false));
+        result.addColumn(new Column("c3", "", true, false));
         return result;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
index df1d0c3..97f2712 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
-import com.google.common.collect.Maps;
 import org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
@@ -42,6 +41,6 @@ public final class PostgreSQLDataConsistencyChecker extends AbstractDataConsiste
     
     @Override
     protected AbstractSQLBuilder getSqlBuilder() {
-        return new PostgreSQLSQLBuilder(Maps.newHashMap());
+        return new PostgreSQLSQLBuilder();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 8faa00c..0422d4b 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -22,9 +22,6 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
 import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
 
-import java.util.Map;
-import java.util.Set;
-
 /**
  * postgreSQL importer.
  */
@@ -35,8 +32,8 @@ public final class PostgreSQLImporter extends AbstractJDBCImporter {
     }
     
     @Override
-    protected AbstractSQLBuilder createSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        return new PostgreSQLSQLBuilder(shardingColumnsMap);
+    protected AbstractSQLBuilder createSQLBuilder() {
+        return new PostgreSQLSQLBuilder();
     }
 }
 
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
index 488a09e..e59fb36 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
@@ -17,23 +17,16 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-
-import java.util.Map;
-import java.util.Set;
+import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
 
 /**
  * PostgreSQL SQL builder.
  */
 public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
     
-    public PostgreSQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
-        super(shardingColumnsMap);
-    }
-    
     @Override
     public String getLeftIdentifierQuoteString() {
         return "\"";
@@ -45,15 +38,14 @@ public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
     }
     
     @Override
-    public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
-        PreparedSQL preparedSQL = super.buildInsertSQL(dataRecord);
-        return new PreparedSQL(preparedSQL.getSql() + buildConflictSQL(dataRecord), preparedSQL.getValuesIndex());
+    public String buildInsertSQL(final DataRecord dataRecord) {
+        return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
     }
     
     private String buildConflictSQL(final DataRecord dataRecord) {
         StringBuilder result = new StringBuilder(" ON CONFLICT (");
-        for (Integer each : RecordUtil.extractPrimaryColumns(dataRecord)) {
-            result.append(dataRecord.getColumn(each).getName()).append(",");
+        for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
+            result.append(each.getName()).append(",");
         }
         result.setLength(result.length() - 1);
         result.append(") DO NOTHING");
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
index 19a1bf9..f7b4e68 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
@@ -17,14 +17,11 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
-import com.google.common.collect.Maps;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -46,9 +43,8 @@ public final class PostgreSQLImporterTest {
     @Test
     public void assertCreateSQLBuilder() {
         PostgreSQLImporter postgreSQLImporter = new PostgreSQLImporter(importerConfig, dataSourceManager);
-        PreparedSQL insertSQL = postgreSQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
-        assertThat(insertSQL.getSql(), is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
-        assertThat(insertSQL.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+        String insertSQL = postgreSQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
     }
     
     private DataRecord mockDataRecord() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
index 885c5f4..3f14f4c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
@@ -17,12 +17,9 @@
 
 package org.apache.shardingsphere.scaling.postgresql;
 
-import com.google.common.collect.Maps;
-import org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
-import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.postgresql.replication.LogSequenceNumber;
 
@@ -33,9 +30,8 @@ public final class PostgreSQLSqlBuilderTest {
     
     @Test
     public void assertBuildInsertSQL() {
-        PreparedSQL actual = new PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
-        assertThat(actual.getSql(), is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
-        assertThat(actual.getValuesIndex().toArray(), Matchers.arrayContaining(0, 1));
+        String actual = new PostgreSQLSQLBuilder().buildInsertSQL(mockDataRecord());
+        assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
     }
     
     private DataRecord mockDataRecord() {