You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/19 05:21:01 UTC

[shardingsphere] branch master updated: Supports scaling MySQL table which only contains unique index (#17786)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e474fb182 Supports scaling MySQL table which only contains unique index (#17786)
c3e474fb182 is described below

commit c3e474fb182634c0685f24c6cb77d6d6135ff378
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Thu May 19 13:20:55 2022 +0800

    Supports scaling MySQL table which only contains unique index (#17786)
    
    * Load indexes in PipelineTableMetaDataLoader
    
    * Load unique indexes only
    
    * Add nullable into PipelineColumnMetaData
    
    * Supports scaling table which only contains unique index
    
    * Fix checkstyle
    
    * Fix PipelineTableMetaDataTest
    
    * Complete InventoryTaskSplitterTest
    
    * Complete PipelineTableMetaDataLoaderTest
---
 .../ingest/InventoryDumperConfiguration.java       |  3 +-
 .../data/pipeline/api/ingest/record/Column.java    |  6 +--
 .../pipeline/api/ingest/record/DataRecord.java     | 18 ++++----
 .../pipeline/core/importer/AbstractImporter.java   |  2 +-
 .../pipeline/core/importer/DataRecordMerger.java   |  8 ++--
 .../ingest/dumper/AbstractInventoryDumper.java     | 14 +++----
 .../loader/PipelineTableMetaDataLoader.java        | 28 ++++++++++++-
 .../metadata/model/PipelineColumnMetaData.java     |  2 +
 .../metadata/model/PipelineIndexMetaData.java}     | 28 +++----------
 .../core/metadata/model/PipelineTableMetaData.java | 19 ++++++---
 .../data/pipeline/core/record/RecordUtil.java      |  4 +-
 .../rulealtered/prepare/InventoryTaskSplitter.java | 49 ++++++++++++++--------
 .../mysql/ingest/MySQLIncrementalDumper.java       |  4 +-
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  2 +-
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  2 +-
 .../postgresql/ingest/wal/WalEventConverter.java   |  7 ++--
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  2 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  2 +-
 .../loader/PipelineTableMetaDataLoaderTest.java    | 31 ++++++++++++--
 .../metadata/model/PipelineTableMetaDataTest.java  | 17 ++++----
 .../data/pipeline/core/task/InventoryTaskTest.java |  2 +-
 .../prepare/InventoryTaskSplitterTest.java         | 21 +++++++++-
 22 files changed, 173 insertions(+), 98 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 1d5635aec6c..c1f33e2cc54 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -35,8 +35,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
     
     private String logicTableName;
     
-    // TODO rename to uniqueKey
-    private String primaryKey;
+    private String uniqueKey;
     
     private Integer uniqueKeyDataType;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
index b1417b4b814..24a25b8a765 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
@@ -38,10 +38,10 @@ public final class Column {
     
     private final boolean updated;
     
-    private final boolean primaryKey;
+    private final boolean uniqueKey;
     
-    public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
-        this(name, null, value, updated, primaryKey);
+    public Column(final String name, final Object value, final boolean updated, final boolean uniqueKey) {
+        this(name, null, value, updated, uniqueKey);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 19ad081cc5d..126b8ee17e9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,15 +33,15 @@ import java.util.List;
  */
 @Getter
 @Setter
-@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
+@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
 @ToString
 public final class DataRecord extends Record {
     
     private final List<Column> columns;
     
-    private final List<Object> primaryKeyValue = new LinkedList<>();
+    private final List<Object> uniqueKeyValue = new LinkedList<>();
     
-    private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
+    private final List<Object> oldUniqueKeyValues = new ArrayList<>();
     
     private String type;
     
@@ -59,9 +59,9 @@ public final class DataRecord extends Record {
      */
     public void addColumn(final Column data) {
         columns.add(data);
-        if (data.isPrimaryKey()) {
-            primaryKeyValue.add(data.getValue());
-            oldPrimaryKeyValues.add(data.getOldValue());
+        if (data.isUniqueKey()) {
+            uniqueKeyValue.add(data.getValue());
+            oldUniqueKeyValues.add(data.getOldValue());
         }
     }
     
@@ -90,7 +90,7 @@ public final class DataRecord extends Record {
      * @return key
      */
     public Key getKey() {
-        return new Key(tableName, primaryKeyValue);
+        return new Key(tableName, uniqueKeyValue);
     }
     
     /**
@@ -99,7 +99,7 @@ public final class DataRecord extends Record {
      * @return key
      */
     public Key getOldKey() {
-        return new Key(tableName, oldPrimaryKeyValues);
+        return new Key(tableName, oldUniqueKeyValues);
     }
     
     @EqualsAndHashCode
@@ -108,6 +108,6 @@ public final class DataRecord extends Record {
         
         private final String tableName;
         
-        private final List<Object> primaryKeyValues;
+        private final List<Object> uniqueKeyValues;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 7b8cd297293..c606d4ef103 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -189,7 +189,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
             }
             for (int i = 0; i < conditionColumns.size(); i++) {
                 Column keyColumn = conditionColumns.get(i);
-                ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
+                ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isUniqueKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
             }
             int updateCount = ps.executeUpdate();
             if (1 != updateCount) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 5418794d6fc..632a482fe5b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -129,11 +129,11 @@ public final class DataRecordMerger {
             for (int i = 0; i < dataRecord.getColumnCount(); i++) {
                 mergedDataRecord.addColumn(new Column(
                         dataRecord.getColumn(i).getName(),
-                        dataRecord.getColumn(i).isPrimaryKey()
+                        dataRecord.getColumn(i).isUniqueKey()
                                 ? beforeDataRecord.getColumn(i).getOldValue()
                                 : beforeDataRecord.getColumn(i).getValue(),
                         true,
-                        dataRecord.getColumn(i).isPrimaryKey()));
+                        dataRecord.getColumn(i).isUniqueKey()));
             }
             mergedDataRecord.setTableName(dataRecord.getTableName());
             mergedDataRecord.setType(IngestDataChangeType.DELETE);
@@ -153,12 +153,12 @@ public final class DataRecordMerger {
         for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
             result.addColumn(new Column(
                     curDataRecord.getColumn(i).getName(),
-                    preDataRecord.getColumn(i).isPrimaryKey()
+                    preDataRecord.getColumn(i).isUniqueKey()
                             ? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
                             : null,
                     curDataRecord.getColumn(i).getValue(),
                     preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
-                    curDataRecord.getColumn(i).isPrimaryKey()));
+                    curDataRecord.getColumn(i).isUniqueKey()));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 32afdf05012..654aa1d7d3d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -106,8 +106,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     private void dump() {
         String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
         int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
-        String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, true);
-        String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, false);
+        String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, true);
+        String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, false);
         IngestPosition<?> position = dumperConfig.getPosition();
         log.info("inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}", uniqueKeyDataType, firstSQL, laterSQL, position);
         if (position instanceof FinishedPosition) {
@@ -167,12 +167,12 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
                     record.setType(IngestDataChangeType.INSERT);
                     record.setTableName(logicTableName);
                     for (int i = 1; i <= metaData.getColumnCount(); i++) {
-                        boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 1);
+                        boolean isUniqueKey = tableMetaData.isUniqueKey(i - 1);
                         Object value = readValue(resultSet, i);
-                        if (isPrimaryKey) {
+                        if (isUniqueKey) {
                             maxUniqueKeyValue = value;
                         }
-                        record.addColumn(new Column(metaData.getColumnName(i), value, true, isPrimaryKey));
+                        record.addColumn(new Column(metaData.getColumnName(i), value, true, isUniqueKey));
                     }
                     pushRecord(record);
                     rowCount++;
@@ -198,8 +198,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     }
     
     private IngestPosition<?> newPosition(final ResultSet rs) throws SQLException {
-        return null == dumperConfig.getPrimaryKey() ? new PlaceholderPosition()
-                : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getPrimaryKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+        return null == dumperConfig.getUniqueKey() ? new PlaceholderPosition()
+                : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
     }
     
     protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
index 1f35beda681..75c762edbd9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
@@ -22,17 +22,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -108,17 +113,36 @@ public final class PipelineTableMetaDataLoader {
                     throw ex;
                 }
                 boolean primaryKey = primaryKeys.contains(columnName);
-                PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, primaryKey);
+                boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
+                PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, isNullable, primaryKey);
                 columnMetaDataMap.put(columnName, columnMetaData);
             }
         }
         Map<TableName, PipelineTableMetaData> result = new LinkedHashMap<>();
         for (Entry<String, Map<String, PipelineColumnMetaData>> entry : tablePipelineColumnMetaDataMap.entrySet()) {
-            result.put(new TableName(entry.getKey()), new PipelineTableMetaData(entry.getKey(), entry.getValue()));
+            String tableName = entry.getKey();
+            result.put(new TableName(tableName), new PipelineTableMetaData(tableName, entry.getValue(), loadIndexesOfTable(connection, schemaName, entry.getValue(), tableName)));
         }
         return result;
     }
     
+    private Collection<PipelineIndexMetaData> loadIndexesOfTable(final Connection connection, final String schemaName, final Map<String, PipelineColumnMetaData> columns,
+                                                                 final String tableName) throws SQLException {
+        Map<String, PipelineIndexMetaData> result = new LinkedHashMap<>();
+        Map<String, SortedMap<Short, String>> orderedColumnsOfIndexes = new LinkedHashMap<>();
+        try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, false)) {
+            while (resultSet.next()) {
+                String indexName = resultSet.getString("INDEX_NAME");
+                result.computeIfAbsent(indexName, unused -> new PipelineIndexMetaData(indexName, new LinkedList<>()));
+                orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
+            }
+        }
+        for (PipelineIndexMetaData each : result.values()) {
+            orderedColumnsOfIndexes.get(each.getName()).values().stream().map(columns::get).forEach(each.getColumns()::add);
+        }
+        return result.values();
+    }
+    
     private Set<String> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
         Set<String> result = new LinkedHashSet<>();
         // TODO order primary keys
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
index 6d6e5cd1b73..3202dbdbeeb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
@@ -41,6 +41,8 @@ public final class PipelineColumnMetaData implements Comparable<PipelineColumnMe
     
     private final String dataTypeName;
     
+    private final boolean nullable;
+    
     private final boolean primaryKey;
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
index b1417b4b814..3eeb373bf27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
@@ -15,37 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.ingest.record;
+package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
+import java.util.List;
+
 /**
- * Column.
+ * Pipeline meta data of index.
  */
 @RequiredArgsConstructor
 @Getter
-public final class Column {
+public final class PipelineIndexMetaData {
     
     private final String name;
     
-    /**
-     * Value are available only when the primary key column is updated.
-     */
-    private final Object oldValue;
-    
-    private final Object value;
-    
-    private final boolean updated;
-    
-    private final boolean primaryKey;
-    
-    public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
-        this(name, null, value, updated, primaryKey);
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("%s=%s", name, value);
-    }
+    private final List<PipelineColumnMetaData> columns;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index aef6719320a..86c99c7ea2a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -23,6 +23,7 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -47,7 +48,10 @@ public final class PipelineTableMetaData {
     @Getter
     private final List<String> primaryKeyColumns;
     
-    public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap) {
+    @Getter
+    private final Collection<PipelineIndexMetaData> uniqueIndexes;
+    
+    public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap, final Collection<PipelineIndexMetaData> uniqueIndexes) {
         this.name = name;
         this.columnMetaDataMap = columnMetaDataMap;
         List<PipelineColumnMetaData> columnMetaDataList = new ArrayList<>(columnMetaDataMap.values());
@@ -55,6 +59,7 @@ public final class PipelineTableMetaData {
         columnNames = Collections.unmodifiableList(columnMetaDataList.stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
         primaryKeyColumns = Collections.unmodifiableList(columnMetaDataList.stream().filter(PipelineColumnMetaData::isPrimaryKey)
                 .map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
+        this.uniqueIndexes = Collections.unmodifiableCollection(uniqueIndexes);
     }
     
     /**
@@ -82,13 +87,17 @@ public final class PipelineTableMetaData {
     }
     
     /**
-     * Judge whether column is primary key or not.
+     * Judge whether column is unique key or not.
      *
      * @param columnIndex column index
-     * @return true if the column is primary key, otherwise false
+     * @return true if the column is unique key, otherwise false
      */
-    public boolean isPrimaryKey(final int columnIndex) {
-        return columnIndex < columnNames.size() && columnMetaDataMap.get(columnNames.get(columnIndex)).isPrimaryKey();
+    public boolean isUniqueKey(final int columnIndex) {
+        if (columnIndex >= columnNames.size()) {
+            return false;
+        }
+        String columnName = columnNames.get(columnIndex);
+        return columnMetaDataMap.get(columnName).isPrimaryKey() || (columnName.equals(uniqueIndexes.iterator().next().getColumns().get(0).getName()));
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
index f623da6f196..02d8146909a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
@@ -41,7 +41,7 @@ public final class RecordUtil {
     public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
         List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
         for (Column each : dataRecord.getColumns()) {
-            if (each.isPrimaryKey()) {
+            if (each.isUniqueKey()) {
                 result.add(each);
             }
         }
@@ -58,7 +58,7 @@ public final class RecordUtil {
     public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
         List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
         for (Column each : dataRecord.getColumns()) {
-            if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+            if (each.isUniqueKey() || shardingColumns.contains(each.getName())) {
                 result.add(each);
             }
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 593e3b47979..ed3b1d8a2d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -35,6 +35,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -118,7 +120,7 @@ public final class InventoryTaskSplitter {
             splitDumperConfig.setShardingItem(i++);
             splitDumperConfig.setActualTableName(dumperConfig.getActualTableName());
             splitDumperConfig.setLogicTableName(dumperConfig.getLogicTableName());
-            splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+            splitDumperConfig.setUniqueKey(dumperConfig.getUniqueKey());
             splitDumperConfig.setUniqueKeyDataType(dumperConfig.getUniqueKeyDataType());
             splitDumperConfig.setBatchSize(batchSize);
             splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
@@ -133,51 +135,62 @@ public final class InventoryTaskSplitter {
         String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, actualTableName);
+        PipelineColumnMetaData uniqueKeyColumn = mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
         if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
             Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
             for (IngestPosition<?> each : result) {
                 if (each instanceof PrimaryKeyPosition) {
-                    String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
-                    dumperConfig.setPrimaryKey(primaryKey);
-                    dumperConfig.setUniqueKeyDataType(tableMetaData.getColumnMetaData(primaryKey).getDataType());
+                    dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+                    dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
                     break;
                 }
             }
             // Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
             return result;
         }
-        checkPrimaryKey(tableMetaData, actualTableName);
-        String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
-        dumperConfig.setPrimaryKey(primaryKey);
-        int primaryKeyDataType = tableMetaData.getColumnMetaData(primaryKey).getDataType();
-        dumperConfig.setUniqueKeyDataType(primaryKeyDataType);
-        if (PipelineJdbcUtils.isIntegerColumn(primaryKeyDataType)) {
+        dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+        int uniqueKeyDataType = uniqueKeyColumn.getDataType();
+        dumperConfig.setUniqueKeyDataType(uniqueKeyDataType);
+        if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
             return getPositionByIntegerPrimaryKeyRange(jobContext, dataSource, dumperConfig);
-        } else if (PipelineJdbcUtils.isStringColumn(primaryKeyDataType)) {
+        } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
             return getPositionByStringPrimaryKeyRange();
         } else {
             throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", actualTableName));
         }
     }
     
-    private void checkPrimaryKey(final PipelineTableMetaData tableMetaData, final String tableName) {
+    private PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
         if (null == tableMetaData) {
             throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
         }
         List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
-        if (null == primaryKeys || primaryKeys.isEmpty()) {
-            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key", tableName));
-        }
         if (primaryKeys.size() > 1) {
             throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
         }
+        if (1 == primaryKeys.size()) {
+            return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+        }
+        Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
+        if (uniqueIndexes.isEmpty()) {
+            throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
+        }
+        if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
+            PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
+            if (!column.isNullable()) {
+                return column;
+            }
+            
+        }
+        throw new PipelineJobCreationException(
+                String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
     }
     
     private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new LinkedList<>();
         RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
         String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
-                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey());
+                .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
         int shardingSize = jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput().getShardingSize();
         try (
                 Connection connection = dataSource.getConnection();
@@ -194,7 +207,7 @@ public final class InventoryTaskSplitter {
                     }
                     long endId = rs.getLong(1);
                     if (endId == 0) {
-                        log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), beginId);
+                        log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
                         break;
                     }
                     result.add(new IntegerPrimaryKeyPosition(beginId, endId));
@@ -206,7 +219,7 @@ public final class InventoryTaskSplitter {
                 result.add(new IntegerPrimaryKeyPosition(0, 0));
             }
         } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey()), ex);
+            throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey()), ex);
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 65ab8bc84fb..1adf68bc201 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -131,7 +131,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
             record.setType(IngestDataChangeType.INSERT);
             for (int i = 0; i < each.length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
-                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
+                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.isUniqueKey(i)));
             }
             pushRecord(record);
         }
@@ -168,7 +168,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
             record.setType(IngestDataChangeType.DELETE);
             for (int i = 0, length = each.length; i < length; i++) {
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
-                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
+                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.isUniqueKey(i)));
             }
             pushRecord(record);
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index a50a48c28a4..870f676e3ff 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -50,7 +50,7 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
         StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+            if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
                 continue;
             }
             result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index a56c56781a8..9bd08e7ad72 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -54,7 +54,7 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
     
     @Override
     public List<Column> extractUpdatedColumns(final DataRecord record, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
-        return record.getColumns().stream().filter(each -> !(each.isPrimaryKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
+        return record.getColumns().stream().filter(each -> !(each.isUniqueKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
     }
     
     private String buildConflictSQL(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index e3403ae3061..cc9e26117d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -110,6 +110,7 @@ public final class WalEventConverter {
         // TODO completion columns
         DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
         result.setType(IngestDataChangeType.DELETE);
+        // TODO Unique key may be a column within unique index
         List<String> primaryKeyColumns = getPipelineTableMetaData(event.getTableName()).getPrimaryKeyColumns();
         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
             result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
@@ -125,9 +126,9 @@ public final class WalEventConverter {
     
     private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
         for (int i = 0, count = values.size(); i < count; i++) {
-            boolean isPrimaryKey = tableMetaData.isPrimaryKey(i);
-            Object primaryKeyOldValue = isPrimaryKey ? values.get(i) : null;
-            Column column = new Column(tableMetaData.getColumnMetaData(i).getName(), primaryKeyOldValue, values.get(i), true, isPrimaryKey);
+            boolean isUniqueKey = tableMetaData.isUniqueKey(i);
+            Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
+            Column column = new Column(tableMetaData.getColumnMetaData(i).getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
             dataRecord.addColumn(column);
         }
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 937bb5bf070..e7fda9f2e84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -61,7 +61,7 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
         result.append(") DO UPDATE SET ");
         for (int i = 0; i < dataRecord.getColumnCount(); i++) {
             Column column = dataRecord.getColumn(i);
-            if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+            if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
                 continue;
             }
             result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 967adc6733b..c4891041c91 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -152,7 +152,7 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setPosition(new PlaceholderPosition());
         dumperConfig.setActualTableName("t_order");
         dumperConfig.setLogicTableName("t_order");
-        dumperConfig.setPrimaryKey("order_id");
+        dumperConfig.setUniqueKey("order_id");
         dumperConfig.setUniqueKeyDataType(Types.INTEGER);
         dumperConfig.setShardingItem(0);
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
index 3d0da54f2e3..c45df0c9d3b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.junit.Before;
@@ -33,6 +34,7 @@ import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Collection;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -52,12 +54,14 @@ public final class PipelineTableMetaDataLoaderTest {
     
     private static final String DATA_TYPE = "DATA_TYPE";
     
-    private static final String TYPE_NAME = "TYPE_NAME";
-    
     private static final String TABLE_NAME = "TABLE_NAME";
     
+    private static final String INDEX_NAME = "INDEX_NAME";
+    
     private static final String TEST_TABLE = "test";
     
+    private static final String TEST_INDEX = "idx_test";
+    
     private PipelineDataSourceWrapper dataSource;
     
     @Mock
@@ -72,6 +76,9 @@ public final class PipelineTableMetaDataLoaderTest {
     @Mock
     private ResultSet columnMetaDataResultSet;
     
+    @Mock
+    private ResultSet indexInfoResultSet;
+    
     @Before
     public void setUp() throws SQLException {
         DataSource rawDataSource = mock(DataSource.class);
@@ -88,13 +95,20 @@ public final class PipelineTableMetaDataLoaderTest {
         when(columnMetaDataResultSet.getInt(ORDINAL_POSITION)).thenReturn(1, 2, 3);
         when(columnMetaDataResultSet.getString(COLUMN_NAME)).thenReturn("id", "name", "age");
         when(columnMetaDataResultSet.getInt(DATA_TYPE)).thenReturn(Types.BIGINT, Types.VARCHAR, Types.INTEGER);
+        when(databaseMetaData.getIndexInfo(TEST_CATALOG, null, TEST_TABLE, true, false)).thenReturn(indexInfoResultSet);
+        when(indexInfoResultSet.next()).thenReturn(true, true, false);
+        when(indexInfoResultSet.getString(INDEX_NAME)).thenReturn(TEST_INDEX);
+        when(indexInfoResultSet.getString(COLUMN_NAME)).thenReturn("name", "id");
+        when(indexInfoResultSet.getShort(ORDINAL_POSITION)).thenReturn((short) 2, (short) 1);
     }
     
     @Test
     public void assertGetTableMetaData() {
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
-        assertColumnMetaData(metaDataLoader.getTableMetaData(null, TEST_TABLE));
-        assertPrimaryKeys(metaDataLoader.getTableMetaData(null, TEST_TABLE).getPrimaryKeyColumns());
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(null, TEST_TABLE);
+        assertColumnMetaData(tableMetaData);
+        assertPrimaryKeys(tableMetaData.getPrimaryKeyColumns());
+        assertIndexMetaData(tableMetaData.getUniqueIndexes());
     }
     
     private void assertPrimaryKeys(final List<String> actual) {
@@ -114,6 +128,15 @@ public final class PipelineTableMetaDataLoaderTest {
         assertThat(actual.getDataType(), is(expectedType));
     }
     
+    private void assertIndexMetaData(final Collection<PipelineIndexMetaData> actualUniqueIndexes) {
+        assertThat(actualUniqueIndexes.size(), is(1));
+        PipelineIndexMetaData actualIndexMetaData = actualUniqueIndexes.iterator().next();
+        assertThat(actualIndexMetaData.getName(), is(TEST_INDEX));
+        assertThat(actualIndexMetaData.getColumns().size(), is(2));
+        assertThat(actualIndexMetaData.getColumns().get(0).getName(), is("id"));
+        assertThat(actualIndexMetaData.getColumns().get(1).getName(), is("name"));
+    }
+    
     @Test(expected = RuntimeException.class)
     public void assertGetTableMetaDataFailure() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException(""));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index bc76828ece1..6e153b5678d 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.sql.Types;
+import java.util.Collections;
+
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-
-import java.sql.Types;
-import java.util.Collections;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public final class PipelineTableMetaDataTest {
     
@@ -35,7 +35,8 @@ public final class PipelineTableMetaDataTest {
     
     @Before
     public void setUp() {
-        pipelineTableMetaData = new PipelineTableMetaData("test_data", Collections.singletonMap("test", new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true)));
+        PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
+        pipelineTableMetaData = new PipelineTableMetaData("test_data", Collections.singletonMap("test", column), Collections.emptySet());
     }
     
     @Test
@@ -59,7 +60,7 @@ public final class PipelineTableMetaDataTest {
     
     @Test
     public void assertIsPrimaryKey() {
-        assertTrue(pipelineTableMetaData.isPrimaryKey(0));
-        assertFalse(pipelineTableMetaData.isPrimaryKey(1));
+        assertTrue(pipelineTableMetaData.isUniqueKey(0));
+        assertFalse(pipelineTableMetaData.isUniqueKey(1));
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 1fd66a11261..5bd098d8724 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -107,7 +107,7 @@ public final class InventoryTaskTest {
         InventoryDumperConfiguration result = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
         result.setLogicTableName(logicTableName);
         result.setActualTableName(actualTableName);
-        result.setPrimaryKey("order_id");
+        result.setUniqueKey("order_id");
         result.setUniqueKeyDataType(Types.INTEGER);
         result.setPosition(null == taskConfig.getDumperConfig().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperConfig().getPosition());
         return result;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index bce1657359f..7cd1f55f352 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -96,6 +96,13 @@ public final class InventoryTaskSplitterTest {
         inventoryTaskSplitter.splitInventoryData(jobContext);
     }
     
+    @Test
+    public void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws SQLException {
+        initUniqueIndexOnNotNullColumnEnvironment(taskConfig.getDumperConfig());
+        List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
+        assertThat(actual.size(), is(1));
+    }
+    
     @Test(expected = PipelineJobCreationException.class)
     public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
@@ -103,7 +110,7 @@ public final class InventoryTaskSplitterTest {
     }
     
     @Test(expected = PipelineJobCreationException.class)
-    public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
+    public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
         inventoryTaskSplitter.splitInventoryData(jobContext);
     }
@@ -163,4 +170,16 @@ public final class InventoryTaskSplitterTest {
             statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
         }
     }
+    
+    private void initUniqueIndexOnNotNullColumnEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
+        DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS t_order");
+            statement.execute("CREATE TABLE t_order (order_id INT NOT NULL, user_id VARCHAR(12))");
+            statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+            statement.execute("CREATE UNIQUE INDEX unique_order_id ON t_order (order_id)");
+        }
+    }
 }