You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/19 05:21:01 UTC
[shardingsphere] branch master updated: Supports scaling MySQL table which only contains unique index (#17786)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c3e474fb182 Supports scaling MySQL table which only contains unique index (#17786)
c3e474fb182 is described below
commit c3e474fb182634c0685f24c6cb77d6d6135ff378
Author: 吴伟杰 <wu...@apache.org>
AuthorDate: Thu May 19 13:20:55 2022 +0800
Supports scaling MySQL table which only contains unique index (#17786)
* Load indexes in PipelineTableMetaDataLoader
* Load unique indexes only
* Add nullable into PipelineColumnMetaData
* Supports scaling table which only contains unique index
* Fix checkstyle
* Fix PipelineTableMetaDataTest
* Complete InventoryTaskSplitterTest
* Complete PipelineTableMetaDataLoaderTest
---
.../ingest/InventoryDumperConfiguration.java | 3 +-
.../data/pipeline/api/ingest/record/Column.java | 6 +--
.../pipeline/api/ingest/record/DataRecord.java | 18 ++++----
.../pipeline/core/importer/AbstractImporter.java | 2 +-
.../pipeline/core/importer/DataRecordMerger.java | 8 ++--
.../ingest/dumper/AbstractInventoryDumper.java | 14 +++----
.../loader/PipelineTableMetaDataLoader.java | 28 ++++++++++++-
.../metadata/model/PipelineColumnMetaData.java | 2 +
.../metadata/model/PipelineIndexMetaData.java} | 28 +++----------
.../core/metadata/model/PipelineTableMetaData.java | 19 ++++++---
.../data/pipeline/core/record/RecordUtil.java | 4 +-
.../rulealtered/prepare/InventoryTaskSplitter.java | 49 ++++++++++++++--------
.../mysql/ingest/MySQLIncrementalDumper.java | 4 +-
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 2 +-
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 2 +-
.../postgresql/ingest/wal/WalEventConverter.java | 7 ++--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 2 +-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 2 +-
.../loader/PipelineTableMetaDataLoaderTest.java | 31 ++++++++++++--
.../metadata/model/PipelineTableMetaDataTest.java | 17 ++++----
.../data/pipeline/core/task/InventoryTaskTest.java | 2 +-
.../prepare/InventoryTaskSplitterTest.java | 21 +++++++++-
22 files changed, 173 insertions(+), 98 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 1d5635aec6c..c1f33e2cc54 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -35,8 +35,7 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
private String logicTableName;
- // TODO rename to uniqueKey
- private String primaryKey;
+ private String uniqueKey;
private Integer uniqueKeyDataType;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
index b1417b4b814..24a25b8a765 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
@@ -38,10 +38,10 @@ public final class Column {
private final boolean updated;
- private final boolean primaryKey;
+ private final boolean uniqueKey;
- public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
- this(name, null, value, updated, primaryKey);
+ public Column(final String name, final Object value, final boolean updated, final boolean uniqueKey) {
+ this(name, null, value, updated, uniqueKey);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index 19ad081cc5d..126b8ee17e9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,15 +33,15 @@ import java.util.List;
*/
@Getter
@Setter
-@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
+@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
@ToString
public final class DataRecord extends Record {
private final List<Column> columns;
- private final List<Object> primaryKeyValue = new LinkedList<>();
+ private final List<Object> uniqueKeyValue = new LinkedList<>();
- private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
+ private final List<Object> oldUniqueKeyValues = new ArrayList<>();
private String type;
@@ -59,9 +59,9 @@ public final class DataRecord extends Record {
*/
public void addColumn(final Column data) {
columns.add(data);
- if (data.isPrimaryKey()) {
- primaryKeyValue.add(data.getValue());
- oldPrimaryKeyValues.add(data.getOldValue());
+ if (data.isUniqueKey()) {
+ uniqueKeyValue.add(data.getValue());
+ oldUniqueKeyValues.add(data.getOldValue());
}
}
@@ -90,7 +90,7 @@ public final class DataRecord extends Record {
* @return key
*/
public Key getKey() {
- return new Key(tableName, primaryKeyValue);
+ return new Key(tableName, uniqueKeyValue);
}
/**
@@ -99,7 +99,7 @@ public final class DataRecord extends Record {
* @return key
*/
public Key getOldKey() {
- return new Key(tableName, oldPrimaryKeyValues);
+ return new Key(tableName, oldUniqueKeyValues);
}
@EqualsAndHashCode
@@ -108,6 +108,6 @@ public final class DataRecord extends Record {
private final String tableName;
- private final List<Object> primaryKeyValues;
+ private final List<Object> uniqueKeyValues;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 7b8cd297293..c606d4ef103 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -189,7 +189,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
}
for (int i = 0; i < conditionColumns.size(); i++) {
Column keyColumn = conditionColumns.get(i);
- ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isPrimaryKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
+ ps.setObject(updatedColumns.size() + i + 1, (keyColumn.isUniqueKey() && keyColumn.isUpdated()) ? keyColumn.getOldValue() : keyColumn.getValue());
}
int updateCount = ps.executeUpdate();
if (1 != updateCount) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 5418794d6fc..632a482fe5b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -129,11 +129,11 @@ public final class DataRecordMerger {
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
mergedDataRecord.addColumn(new Column(
dataRecord.getColumn(i).getName(),
- dataRecord.getColumn(i).isPrimaryKey()
+ dataRecord.getColumn(i).isUniqueKey()
? beforeDataRecord.getColumn(i).getOldValue()
: beforeDataRecord.getColumn(i).getValue(),
true,
- dataRecord.getColumn(i).isPrimaryKey()));
+ dataRecord.getColumn(i).isUniqueKey()));
}
mergedDataRecord.setTableName(dataRecord.getTableName());
mergedDataRecord.setType(IngestDataChangeType.DELETE);
@@ -153,12 +153,12 @@ public final class DataRecordMerger {
for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
result.addColumn(new Column(
curDataRecord.getColumn(i).getName(),
- preDataRecord.getColumn(i).isPrimaryKey()
+ preDataRecord.getColumn(i).isUniqueKey()
? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
: null,
curDataRecord.getColumn(i).getValue(),
preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
- curDataRecord.getColumn(i).isPrimaryKey()));
+ curDataRecord.getColumn(i).isUniqueKey()));
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 32afdf05012..654aa1d7d3d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -106,8 +106,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
private void dump() {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
- String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, true);
- String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), uniqueKeyDataType, false);
+ String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, true);
+ String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, false);
IngestPosition<?> position = dumperConfig.getPosition();
log.info("inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}", uniqueKeyDataType, firstSQL, laterSQL, position);
if (position instanceof FinishedPosition) {
@@ -167,12 +167,12 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
record.setType(IngestDataChangeType.INSERT);
record.setTableName(logicTableName);
for (int i = 1; i <= metaData.getColumnCount(); i++) {
- boolean isPrimaryKey = tableMetaData.isPrimaryKey(i - 1);
+ boolean isUniqueKey = tableMetaData.isUniqueKey(i - 1);
Object value = readValue(resultSet, i);
- if (isPrimaryKey) {
+ if (isUniqueKey) {
maxUniqueKeyValue = value;
}
- record.addColumn(new Column(metaData.getColumnName(i), value, true, isPrimaryKey));
+ record.addColumn(new Column(metaData.getColumnName(i), value, true, isUniqueKey));
}
pushRecord(record);
rowCount++;
@@ -198,8 +198,8 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
}
private IngestPosition<?> newPosition(final ResultSet rs) throws SQLException {
- return null == dumperConfig.getPrimaryKey() ? new PlaceholderPosition()
- : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getPrimaryKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+ return null == dumperConfig.getUniqueKey() ? new PlaceholderPosition()
+ : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
}
protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
index 1f35beda681..75c762edbd9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
@@ -22,17 +22,22 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -108,17 +113,36 @@ public final class PipelineTableMetaDataLoader {
throw ex;
}
boolean primaryKey = primaryKeys.contains(columnName);
- PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, primaryKey);
+ boolean isNullable = "YES".equals(resultSet.getString("IS_NULLABLE"));
+ PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(ordinalPosition, columnName, dataType, dataTypeName, isNullable, primaryKey);
columnMetaDataMap.put(columnName, columnMetaData);
}
}
Map<TableName, PipelineTableMetaData> result = new LinkedHashMap<>();
for (Entry<String, Map<String, PipelineColumnMetaData>> entry : tablePipelineColumnMetaDataMap.entrySet()) {
- result.put(new TableName(entry.getKey()), new PipelineTableMetaData(entry.getKey(), entry.getValue()));
+ String tableName = entry.getKey();
+ result.put(new TableName(tableName), new PipelineTableMetaData(tableName, entry.getValue(), loadIndexesOfTable(connection, schemaName, entry.getValue(), tableName)));
}
return result;
}
+ private Collection<PipelineIndexMetaData> loadIndexesOfTable(final Connection connection, final String schemaName, final Map<String, PipelineColumnMetaData> columns,
+ final String tableName) throws SQLException {
+ Map<String, PipelineIndexMetaData> result = new LinkedHashMap<>();
+ Map<String, SortedMap<Short, String>> orderedColumnsOfIndexes = new LinkedHashMap<>();
+ try (ResultSet resultSet = connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, tableName, true, false)) {
+ while (resultSet.next()) {
+ String indexName = resultSet.getString("INDEX_NAME");
+ result.computeIfAbsent(indexName, unused -> new PipelineIndexMetaData(indexName, new LinkedList<>()));
+ orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> new TreeMap<>()).put(resultSet.getShort("ORDINAL_POSITION"), resultSet.getString("COLUMN_NAME"));
+ }
+ }
+ for (PipelineIndexMetaData each : result.values()) {
+ orderedColumnsOfIndexes.get(each.getName()).values().stream().map(columns::get).forEach(each.getColumns()::add);
+ }
+ return result.values();
+ }
+
private Set<String> loadPrimaryKeys(final Connection connection, final String schemaName, final String tableName) throws SQLException {
Set<String> result = new LinkedHashSet<>();
// TODO order primary keys
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
index 6d6e5cd1b73..3202dbdbeeb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
@@ -41,6 +41,8 @@ public final class PipelineColumnMetaData implements Comparable<PipelineColumnMe
private final String dataTypeName;
+ private final boolean nullable;
+
private final boolean primaryKey;
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
index b1417b4b814..3eeb373bf27 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/Column.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
@@ -15,37 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.ingest.record;
+package org.apache.shardingsphere.data.pipeline.core.metadata.model;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import java.util.List;
+
/**
- * Column.
+ * Pipeline meta data of index.
*/
@RequiredArgsConstructor
@Getter
-public final class Column {
+public final class PipelineIndexMetaData {
private final String name;
- /**
- * Value are available only when the primary key column is updated.
- */
- private final Object oldValue;
-
- private final Object value;
-
- private final boolean updated;
-
- private final boolean primaryKey;
-
- public Column(final String name, final Object value, final boolean updated, final boolean primaryKey) {
- this(name, null, value, updated, primaryKey);
- }
-
- @Override
- public String toString() {
- return String.format("%s=%s", name, value);
- }
+ private final List<PipelineColumnMetaData> columns;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index aef6719320a..86c99c7ea2a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -23,6 +23,7 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -47,7 +48,10 @@ public final class PipelineTableMetaData {
@Getter
private final List<String> primaryKeyColumns;
- public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap) {
+ @Getter
+ private final Collection<PipelineIndexMetaData> uniqueIndexes;
+
+ public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap, final Collection<PipelineIndexMetaData> uniqueIndexes) {
this.name = name;
this.columnMetaDataMap = columnMetaDataMap;
List<PipelineColumnMetaData> columnMetaDataList = new ArrayList<>(columnMetaDataMap.values());
@@ -55,6 +59,7 @@ public final class PipelineTableMetaData {
columnNames = Collections.unmodifiableList(columnMetaDataList.stream().map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
primaryKeyColumns = Collections.unmodifiableList(columnMetaDataList.stream().filter(PipelineColumnMetaData::isPrimaryKey)
.map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
+ this.uniqueIndexes = Collections.unmodifiableCollection(uniqueIndexes);
}
/**
@@ -82,13 +87,17 @@ public final class PipelineTableMetaData {
}
/**
- * Judge whether column is primary key or not.
+ * Judge whether column is unique key or not.
*
* @param columnIndex column index
- * @return true if the column is primary key, otherwise false
+ * @return true if the column is unique key, otherwise false
*/
- public boolean isPrimaryKey(final int columnIndex) {
- return columnIndex < columnNames.size() && columnMetaDataMap.get(columnNames.get(columnIndex)).isPrimaryKey();
+ public boolean isUniqueKey(final int columnIndex) {
+ if (columnIndex >= columnNames.size()) {
+ return false;
+ }
+ String columnName = columnNames.get(columnIndex);
+ return columnMetaDataMap.get(columnName).isPrimaryKey() || (columnName.equals(uniqueIndexes.iterator().next().getColumns().get(0).getName()));
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
index f623da6f196..02d8146909a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
@@ -41,7 +41,7 @@ public final class RecordUtil {
public static List<Column> extractPrimaryColumns(final DataRecord dataRecord) {
List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
for (Column each : dataRecord.getColumns()) {
- if (each.isPrimaryKey()) {
+ if (each.isUniqueKey()) {
result.add(each);
}
}
@@ -58,7 +58,7 @@ public final class RecordUtil {
public static List<Column> extractConditionColumns(final DataRecord dataRecord, final Set<String> shardingColumns) {
List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
for (Column each : dataRecord.getColumns()) {
- if (each.isPrimaryKey() || shardingColumns.contains(each.getName())) {
+ if (each.isUniqueKey() || shardingColumns.contains(each.getName())) {
result.add(each);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 593e3b47979..ed3b1d8a2d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -35,6 +35,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreatio
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -118,7 +120,7 @@ public final class InventoryTaskSplitter {
splitDumperConfig.setShardingItem(i++);
splitDumperConfig.setActualTableName(dumperConfig.getActualTableName());
splitDumperConfig.setLogicTableName(dumperConfig.getLogicTableName());
- splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+ splitDumperConfig.setUniqueKey(dumperConfig.getUniqueKey());
splitDumperConfig.setUniqueKeyDataType(dumperConfig.getUniqueKeyDataType());
splitDumperConfig.setBatchSize(batchSize);
splitDumperConfig.setRateLimitAlgorithm(rateLimitAlgorithm);
@@ -133,51 +135,62 @@ public final class InventoryTaskSplitter {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, actualTableName);
+ PipelineColumnMetaData uniqueKeyColumn = mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
if (null != initProgress && initProgress.getStatus() != JobStatus.PREPARING_FAILURE) {
Collection<IngestPosition<?>> result = initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
for (IngestPosition<?> each : result) {
if (each instanceof PrimaryKeyPosition) {
- String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
- dumperConfig.setPrimaryKey(primaryKey);
- dumperConfig.setUniqueKeyDataType(tableMetaData.getColumnMetaData(primaryKey).getDataType());
+ dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+ dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
break;
}
}
// Do NOT filter FinishedPosition here, since whole inventory tasks are required in job progress when persisting to register center.
return result;
}
- checkPrimaryKey(tableMetaData, actualTableName);
- String primaryKey = tableMetaData.getPrimaryKeyColumns().get(0);
- dumperConfig.setPrimaryKey(primaryKey);
- int primaryKeyDataType = tableMetaData.getColumnMetaData(primaryKey).getDataType();
- dumperConfig.setUniqueKeyDataType(primaryKeyDataType);
- if (PipelineJdbcUtils.isIntegerColumn(primaryKeyDataType)) {
+ dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+ int uniqueKeyDataType = uniqueKeyColumn.getDataType();
+ dumperConfig.setUniqueKeyDataType(uniqueKeyDataType);
+ if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
return getPositionByIntegerPrimaryKeyRange(jobContext, dataSource, dumperConfig);
- } else if (PipelineJdbcUtils.isStringColumn(primaryKeyDataType)) {
+ } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
return getPositionByStringPrimaryKeyRange();
} else {
throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is not integer or string type", actualTableName));
}
}
- private void checkPrimaryKey(final PipelineTableMetaData tableMetaData, final String tableName) {
+ private PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) {
if (null == tableMetaData) {
throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: can not get table metadata ", tableName));
}
List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
- if (null == primaryKeys || primaryKeys.isEmpty()) {
- throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key", tableName));
- }
if (primaryKeys.size() > 1) {
throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: primary key is union primary", tableName));
}
+ if (1 == primaryKeys.size()) {
+ return tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ }
+ Collection<PipelineIndexMetaData> uniqueIndexes = tableMetaData.getUniqueIndexes();
+ if (uniqueIndexes.isEmpty()) {
+ throw new PipelineJobCreationException(String.format("Can not split range for table %s, reason: no primary key or unique index", tableName));
+ }
+ if (1 == uniqueIndexes.size() && 1 == uniqueIndexes.iterator().next().getColumns().size()) {
+ PipelineColumnMetaData column = uniqueIndexes.iterator().next().getColumns().get(0);
+ if (!column.isNullable()) {
+ return column;
+ }
+
+ }
+ throw new PipelineJobCreationException(
+ String.format("Can not split range for table %s, reason: table contains multiple unique index or unique index contains nullable/multiple column(s)", tableName));
}
private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new LinkedList<>();
RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
String sql = PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
- .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey());
+ .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
int shardingSize = jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput().getShardingSize();
try (
Connection connection = dataSource.getConnection();
@@ -194,7 +207,7 @@ public final class InventoryTaskSplitter {
}
long endId = rs.getLong(1);
if (endId == 0) {
- log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey(), beginId);
+ log.info("getPositionByPrimaryKeyRange, endId is 0, break, tableName={}, primaryKey={}, beginId={}", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
break;
}
result.add(new IntegerPrimaryKeyPosition(beginId, endId));
@@ -206,7 +219,7 @@ public final class InventoryTaskSplitter {
result.add(new IntegerPrimaryKeyPosition(0, 0));
}
} catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getPrimaryKey()), ex);
+ throw new PipelineJobPrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getActualTableName(), dumperConfig.getUniqueKey()), ex);
}
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 65ab8bc84fb..1adf68bc201 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -131,7 +131,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
record.setType(IngestDataChangeType.INSERT);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
- record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
+ record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.isUniqueKey(i)));
}
pushRecord(record);
}
@@ -168,7 +168,7 @@ public final class MySQLIncrementalDumper extends AbstractIncrementalDumper<Binl
record.setType(IngestDataChangeType.DELETE);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i);
- record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isPrimaryKey()));
+ record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, tableMetaData.isUniqueKey(i)));
}
pushRecord(record);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index a50a48c28a4..870f676e3ff 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -50,7 +50,7 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
- if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+ if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index a56c56781a8..9bd08e7ad72 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -54,7 +54,7 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
@Override
public List<Column> extractUpdatedColumns(final DataRecord record, final Map<LogicTableName, Set<String>> shardingColumnsMap) {
- return record.getColumns().stream().filter(each -> !(each.isPrimaryKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
+ return record.getColumns().stream().filter(each -> !(each.isUniqueKey() || isShardingColumn(shardingColumnsMap, record.getTableName(), each.getName()))).collect(Collectors.toList());
}
private String buildConflictSQL(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index e3403ae3061..cc9e26117d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -110,6 +110,7 @@ public final class WalEventConverter {
// TODO completion columns
DataRecord result = createDataRecord(event, event.getPrimaryKeys().size());
result.setType(IngestDataChangeType.DELETE);
+ // TODO Unique key may be a column within unique index
List<String> primaryKeyColumns = getPipelineTableMetaData(event.getTableName()).getPrimaryKeyColumns();
for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
@@ -125,9 +126,9 @@ public final class WalEventConverter {
private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final List<Object> values) {
for (int i = 0, count = values.size(); i < count; i++) {
- boolean isPrimaryKey = tableMetaData.isPrimaryKey(i);
- Object primaryKeyOldValue = isPrimaryKey ? values.get(i) : null;
- Column column = new Column(tableMetaData.getColumnMetaData(i).getName(), primaryKeyOldValue, values.get(i), true, isPrimaryKey);
+ boolean isUniqueKey = tableMetaData.isUniqueKey(i);
+ Object uniqueKeyOldValue = isUniqueKey ? values.get(i) : null;
+ Column column = new Column(tableMetaData.getColumnMetaData(i).getName(), uniqueKeyOldValue, values.get(i), true, isUniqueKey);
dataRecord.addColumn(column);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 937bb5bf070..e7fda9f2e84 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -61,7 +61,7 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
result.append(") DO UPDATE SET ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
- if (column.isPrimaryKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
+ if (column.isUniqueKey() || isShardingColumn(shardingColumnsMap, dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=EXCLUDED.").append(quote(column.getName())).append(",");
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 967adc6733b..c4891041c91 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -152,7 +152,7 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setPosition(new PlaceholderPosition());
dumperConfig.setActualTableName("t_order");
dumperConfig.setLogicTableName("t_order");
- dumperConfig.setPrimaryKey("order_id");
+ dumperConfig.setUniqueKey("order_id");
dumperConfig.setUniqueKeyDataType(Types.INTEGER);
dumperConfig.setShardingItem(0);
PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
index 3d0da54f2e3..c45df0c9d3b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
import org.junit.Before;
@@ -33,6 +34,7 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.Collection;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
@@ -52,12 +54,14 @@ public final class PipelineTableMetaDataLoaderTest {
private static final String DATA_TYPE = "DATA_TYPE";
- private static final String TYPE_NAME = "TYPE_NAME";
-
private static final String TABLE_NAME = "TABLE_NAME";
+ private static final String INDEX_NAME = "INDEX_NAME";
+
private static final String TEST_TABLE = "test";
+ private static final String TEST_INDEX = "idx_test";
+
private PipelineDataSourceWrapper dataSource;
@Mock
@@ -72,6 +76,9 @@ public final class PipelineTableMetaDataLoaderTest {
@Mock
private ResultSet columnMetaDataResultSet;
+ @Mock
+ private ResultSet indexInfoResultSet;
+
@Before
public void setUp() throws SQLException {
DataSource rawDataSource = mock(DataSource.class);
@@ -88,13 +95,20 @@ public final class PipelineTableMetaDataLoaderTest {
when(columnMetaDataResultSet.getInt(ORDINAL_POSITION)).thenReturn(1, 2, 3);
when(columnMetaDataResultSet.getString(COLUMN_NAME)).thenReturn("id", "name", "age");
when(columnMetaDataResultSet.getInt(DATA_TYPE)).thenReturn(Types.BIGINT, Types.VARCHAR, Types.INTEGER);
+ when(databaseMetaData.getIndexInfo(TEST_CATALOG, null, TEST_TABLE, true, false)).thenReturn(indexInfoResultSet);
+ when(indexInfoResultSet.next()).thenReturn(true, true, false);
+ when(indexInfoResultSet.getString(INDEX_NAME)).thenReturn(TEST_INDEX);
+ when(indexInfoResultSet.getString(COLUMN_NAME)).thenReturn("name", "id");
+ when(indexInfoResultSet.getShort(ORDINAL_POSITION)).thenReturn((short) 2, (short) 1);
}
@Test
public void assertGetTableMetaData() {
PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
- assertColumnMetaData(metaDataLoader.getTableMetaData(null, TEST_TABLE));
- assertPrimaryKeys(metaDataLoader.getTableMetaData(null, TEST_TABLE).getPrimaryKeyColumns());
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(null, TEST_TABLE);
+ assertColumnMetaData(tableMetaData);
+ assertPrimaryKeys(tableMetaData.getPrimaryKeyColumns());
+ assertIndexMetaData(tableMetaData.getUniqueIndexes());
}
private void assertPrimaryKeys(final List<String> actual) {
@@ -114,6 +128,15 @@ public final class PipelineTableMetaDataLoaderTest {
assertThat(actual.getDataType(), is(expectedType));
}
+ private void assertIndexMetaData(final Collection<PipelineIndexMetaData> actualUniqueIndexes) {
+ assertThat(actualUniqueIndexes.size(), is(1));
+ PipelineIndexMetaData actualIndexMetaData = actualUniqueIndexes.iterator().next();
+ assertThat(actualIndexMetaData.getName(), is(TEST_INDEX));
+ assertThat(actualIndexMetaData.getColumns().size(), is(2));
+ assertThat(actualIndexMetaData.getColumns().get(0).getName(), is("id"));
+ assertThat(actualIndexMetaData.getColumns().get(1).getName(), is("name"));
+ }
+
@Test(expected = RuntimeException.class)
public void assertGetTableMetaDataFailure() throws SQLException {
when(dataSource.getConnection()).thenThrow(new SQLException(""));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index bc76828ece1..6e153b5678d 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -20,14 +20,14 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.model;
import org.junit.Before;
import org.junit.Test;
+import java.sql.Types;
+import java.util.Collections;
+
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
-
-import java.sql.Types;
-import java.util.Collections;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class PipelineTableMetaDataTest {
@@ -35,7 +35,8 @@ public final class PipelineTableMetaDataTest {
@Before
public void setUp() {
- pipelineTableMetaData = new PipelineTableMetaData("test_data", Collections.singletonMap("test", new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true)));
+ PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", Types.INTEGER, "INTEGER", true, true);
+ pipelineTableMetaData = new PipelineTableMetaData("test_data", Collections.singletonMap("test", column), Collections.emptySet());
}
@Test
@@ -59,7 +60,7 @@ public final class PipelineTableMetaDataTest {
@Test
public void assertIsPrimaryKey() {
- assertTrue(pipelineTableMetaData.isPrimaryKey(0));
- assertFalse(pipelineTableMetaData.isPrimaryKey(1));
+ assertTrue(pipelineTableMetaData.isUniqueKey(0));
+ assertFalse(pipelineTableMetaData.isUniqueKey(1));
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 1fd66a11261..5bd098d8724 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -107,7 +107,7 @@ public final class InventoryTaskTest {
InventoryDumperConfiguration result = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
result.setLogicTableName(logicTableName);
result.setActualTableName(actualTableName);
- result.setPrimaryKey("order_id");
+ result.setUniqueKey("order_id");
result.setUniqueKeyDataType(Types.INTEGER);
result.setPosition(null == taskConfig.getDumperConfig().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperConfig().getPosition());
return result;
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index bce1657359f..7cd1f55f352 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -96,6 +96,13 @@ public final class InventoryTaskSplitterTest {
inventoryTaskSplitter.splitInventoryData(jobContext);
}
+ @Test
+ public void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws SQLException {
+ initUniqueIndexOnNotNullColumnEnvironment(taskConfig.getDumperConfig());
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
+ assertThat(actual.size(), is(1));
+ }
+
@Test(expected = PipelineJobCreationException.class)
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
@@ -103,7 +110,7 @@ public final class InventoryTaskSplitterTest {
}
@Test(expected = PipelineJobCreationException.class)
- public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
+ public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
inventoryTaskSplitter.splitInventoryData(jobContext);
}
@@ -163,4 +170,16 @@ public final class InventoryTaskSplitterTest {
statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
+
+ private void initUniqueIndexOnNotNullColumnEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
+ DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("DROP TABLE IF EXISTS t_order");
+ statement.execute("CREATE TABLE t_order (order_id INT NOT NULL, user_id VARCHAR(12))");
+ statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ statement.execute("CREATE UNIQUE INDEX unique_order_id ON t_order (order_id)");
+ }
+ }
}