You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/02/10 10:17:28 UTC
[shardingsphere] branch master updated: Pipeline job support any type of single column unique key table (#24108)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c6309070a84 Pipeline job support any type of single column unique key table (#24108)
c6309070a84 is described below
commit c6309070a847c3ca12939167805ca44e421ecdeb
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Feb 10 18:17:08 2023 +0800
Pipeline job support any type of single column unique key table (#24108)
* Pipeline job support any type of single column unique key table
* Add starting and finished log in PipelineWatcher
* Refactor REFRESH TABLE METADATA invocation
* Refactor containerComposer.start()
* Refactor NoUniqueKeyMigrationE2EIT to IndexesMigrationE2EIT; Refactor mysql.xml to none.xml
* Support no unique key migration consistency check
* Add special type single column unique key migration E2E
* Fix unit test
---
.../api/ingest/position/NoUniqueKeyPosition.java | 3 --
.../api/ingest/position/PrimaryKeyPosition.java | 2 +-
.../ingest/position/PrimaryKeyPositionFactory.java | 22 +++++----
.../ingest/position/UnsupportedKeyPosition.java} | 34 +++++++++----
.../spi/sqlbuilder/PipelineSQLBuilder.java | 6 +--
.../core/ingest/dumper/InventoryDumper.java | 51 ++++----------------
.../YamlJobItemInventoryTasksProgressSwapper.java | 9 ++--
.../core/prepare/InventoryTaskSplitter.java | 16 +++----
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 8 ++--
.../fixture/FixturePipelineSQLBuilder.java | 4 +-
.../MigrationDataConsistencyChecker.java | 6 ---
.../pipeline/cases/base/PipelineBaseE2EIT.java | 2 -
.../general/MySQLMigrationGeneralE2EIT.java | 1 +
.../general/PostgreSQLMigrationGeneralE2EIT.java | 1 +
...rationE2EIT.java => IndexesMigrationE2EIT.java} | 56 ++++++++++++++++++----
.../container/compose/NativeContainerComposer.java | 5 --
.../framework/param/PipelineTestParameter.java | 1 +
.../framework/watcher/PipelineWatcher.java | 7 +++
.../resources/env/common/migration-command.xml | 6 +--
.../none_primary_key/mysql.xml => common/none.xml} | 8 +---
.../ingest/position/NoUniqueKeyPositionTest.java | 45 +++++++----------
.../position/UnsupportedKeyPositionTest.java | 41 ++++++++++++++++
.../core/prepare/InventoryTaskSplitterTest.java | 8 ++--
23 files changed, 188 insertions(+), 154 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
index 911842bf9b2..01dd783d010 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
@@ -17,12 +17,9 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.position;
-import lombok.RequiredArgsConstructor;
-
/**
* No unique key position.
*/
-@RequiredArgsConstructor
public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<NoUniqueKeyPosition> {
@Override
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
index 6f79125bc67..e690e1acad8 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPosition.java
@@ -42,6 +42,6 @@ public abstract class PrimaryKeyPosition<T> {
@Override
public final String toString() {
- return String.format("%s,%s,%s", getType(), getBeginValue(), getEndValue());
+ return String.format("%s,%s,%s", getType(), null != getBeginValue() ? getBeginValue() : "", null != getEndValue() ? getEndValue() : "");
}
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
index da318934ade..06d20776135 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/PrimaryKeyPositionFactory.java
@@ -18,7 +18,9 @@
package org.apache.shardingsphere.data.pipeline.api.ingest.position;
import com.google.common.base.Preconditions;
-import lombok.NonNull;
+import com.google.common.base.Splitter;
+
+import java.util.List;
/**
* Primary key position factory.
@@ -32,12 +34,12 @@ public final class PrimaryKeyPositionFactory {
* @return primary key position
*/
public static IngestPosition<?> newInstance(final String data) {
- String[] array = data.split(",");
- Preconditions.checkArgument(3 == array.length, "Unknown primary key position: " + data);
- Preconditions.checkArgument(1 == array[0].length(), "Invalid primary key position type: " + array[0]);
- char type = array[0].charAt(0);
- String beginValue = array[1];
- String endValue = array[2];
+ List<String> parts = Splitter.on(',').splitToList(data);
+ Preconditions.checkArgument(3 == parts.size(), "Unknown primary key position: " + data);
+ Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid primary key position type: " + parts.get(0));
+ char type = parts.get(0).charAt(0);
+ String beginValue = parts.get(1);
+ String endValue = parts.get(2);
switch (type) {
case 'i':
return new IntegerPrimaryKeyPosition(Long.parseLong(beginValue), Long.parseLong(endValue));
@@ -45,6 +47,8 @@ public final class PrimaryKeyPositionFactory {
return new StringPrimaryKeyPosition(beginValue, endValue);
case 'n':
return new NoUniqueKeyPosition();
+ case 'u':
+ return new UnsupportedKeyPosition();
default:
throw new IllegalArgumentException("Unknown primary key position type: " + type);
}
@@ -57,13 +61,13 @@ public final class PrimaryKeyPositionFactory {
* @param endValue end value
* @return ingest position
*/
- public static IngestPosition<?> newInstance(final @NonNull Object beginValue, final @NonNull Object endValue) {
+ public static IngestPosition<?> newInstance(final Object beginValue, final Object endValue) {
if (beginValue instanceof Number) {
return new IntegerPrimaryKeyPosition(((Number) beginValue).longValue(), ((Number) endValue).longValue());
}
if (beginValue instanceof CharSequence) {
return new StringPrimaryKeyPosition(beginValue.toString(), endValue.toString());
}
- throw new IllegalArgumentException("Unknown begin value type: " + beginValue.getClass().getName());
+ return new UnsupportedKeyPosition();
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
similarity index 53%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
index 86ce80c12c8..845ee2581c6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineJobUniqueKeyDataTypeException.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/UnsupportedKeyPosition.java
@@ -15,19 +15,35 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.exception.data;
-
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+package org.apache.shardingsphere.data.pipeline.api.ingest.position;
/**
- * Unsupported pipeline job unique key data type exception.
+ * Unsupported key position.
*/
-public final class UnsupportedPipelineJobUniqueKeyDataTypeException extends PipelineSQLException {
+public final class UnsupportedKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<UnsupportedKeyPosition> {
+
+ @Override
+ public Void getBeginValue() {
+ return null;
+ }
+
+ @Override
+ public Void getEndValue() {
+ return null;
+ }
+
+ @Override
+ protected Void convert(final String value) {
+ throw new UnsupportedOperationException();
+ }
- private static final long serialVersionUID = -1605633809724671592L;
+ @Override
+ protected char getType() {
+ return 'u';
+ }
- public UnsupportedPipelineJobUniqueKeyDataTypeException(final int uniqueKeyDataType) {
- super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 32, String.format("Unsupported data type `%s` of unique key for pipeline job.", uniqueKeyDataType));
+ @Override
+ public int compareTo(final UnsupportedKeyPosition position) {
+ return 0;
}
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 5406f5cbc8f..edff4ca1d18 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -47,10 +47,9 @@ public interface PipelineSQLBuilder extends TypedSPI {
* @param tableName table name
* @param uniqueKey unique key
* @param uniqueKeyDataType unique key data type
- * @param firstQuery whether it's the first time query
* @return divisible inventory dump SQL
*/
- String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType, boolean firstQuery);
+ String buildDivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType);
/**
* Build indivisible inventory dump first SQL.
@@ -59,10 +58,9 @@ public interface PipelineSQLBuilder extends TypedSPI {
* @param tableName table name
* @param uniqueKey unique key
* @param uniqueKeyDataType unique key data type
- * @param firstQuery whether it's the first time query
* @return indivisible inventory dump SQL
*/
- String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType, boolean firstQuery);
+ String buildIndivisibleInventoryDumpSQL(String schemaName, String tableName, String uniqueKey, int uniqueKeyDataType);
/**
* Build inventory dump all SQL.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index c1621b5171e..218c293d07c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -37,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineJobUniqueKeyDataTypeException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -54,7 +53,6 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Optional;
/**
* Inventory dumper.
@@ -90,25 +88,15 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
@Override
protected void runBlocking() {
- String firstSQL = buildInventoryDumpSQL(true);
- String laterSQL = buildInventoryDumpSQL(false);
IngestPosition<?> position = dumperConfig.getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
- Object beginUniqueKeyValue = ((PrimaryKeyPosition<?>) position).getBeginValue();
- int round = 1;
try (Connection connection = dataSource.getConnection()) {
- Optional<Object> maxUniqueKeyValue;
- while ((maxUniqueKeyValue = dump(tableMetaData, connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
- beginUniqueKeyValue = maxUniqueKeyValue.get();
- if (!isRunning()) {
- break;
- }
- }
- log.info("Inventory dump done, round={}, maxUniqueKeyValue={}.", round, maxUniqueKeyValue);
+ dump(tableMetaData, connection, buildInventoryDumpSQL(), ((PrimaryKeyPosition<?>) position).getBeginValue());
+ log.info("Inventory dump done");
} catch (final SQLException ex) {
log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException(ex);
@@ -117,69 +105,48 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
}
}
- private String buildInventoryDumpSQL(final boolean firstQuery) {
+ private String buildInventoryDumpSQL() {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
if (null == dumperConfig.getUniqueKey()) {
return sqlBuilder.buildInventoryDumpAllSQL(schemaName, dumperConfig.getActualTableName());
}
if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
- return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), firstQuery);
- }
- if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
- return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), firstQuery);
+ return sqlBuilder.buildDivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType());
}
- throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
+ return sqlBuilder.buildIndivisibleInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType());
}
- private Optional<Object> dump(final PipelineTableMetaData tableMetaData, final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
+ private void dump(final PipelineTableMetaData tableMetaData, final Connection connection, final String sql, final Object beginUniqueKeyValue) throws SQLException {
if (null != dumperConfig.getRateLimitAlgorithm()) {
dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
}
int batchSize = dumperConfig.getBatchSize();
try (PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
dumpStatement = preparedStatement;
- setParameters(preparedStatement, batchSize, beginUniqueKeyValue);
+ preparedStatement.setFetchSize(batchSize);
+ setParameters(preparedStatement, beginUniqueKeyValue);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- int rowCount = 0;
- Object maxUniqueKeyValue = null;
while (resultSet.next()) {
channel.pushRecord(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
- rowCount++;
- if (null != dumperConfig.getUniqueKey()) {
- maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(dumperConfig.getUniqueKey()).getOrdinalPosition());
- }
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
break;
}
}
- if (0 == round % 50) {
- log.info("Dumping, round={}, rowCount={}, maxUniqueKeyValue={}.", round, rowCount, maxUniqueKeyValue);
- }
dumpStatement = null;
- return Optional.ofNullable(maxUniqueKeyValue);
}
}
}
- private void setParameters(final PreparedStatement preparedStatement, final int batchSize, final Object beginUniqueKeyValue) throws SQLException {
- preparedStatement.setFetchSize(batchSize);
+ private void setParameters(final PreparedStatement preparedStatement, final Object beginUniqueKeyValue) throws SQLException {
if (null == dumperConfig.getUniqueKey()) {
return;
}
if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
preparedStatement.setObject(1, beginUniqueKeyValue);
preparedStatement.setObject(2, ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
- preparedStatement.setInt(3, batchSize);
- return;
- }
- if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
- preparedStatement.setObject(1, beginUniqueKeyValue);
- preparedStatement.setInt(2, batchSize);
- return;
}
- throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
}
private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
index add094b8604..6b3af441973 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemInventoryTasksProgressSwapper.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -54,13 +55,13 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
private String[] getFinished(final JobItemInventoryTasksProgress progress) {
return progress.getInventoryTaskProgressMap().entrySet().stream()
.filter(entry -> entry.getValue().getPosition() instanceof FinishedPosition)
- .map(Map.Entry::getKey).toArray(String[]::new);
+ .map(Entry::getKey).toArray(String[]::new);
}
private Map<String, String> getUnfinished(final JobItemInventoryTasksProgress progress) {
return progress.getInventoryTaskProgressMap().entrySet().stream()
.filter(entry -> !(entry.getValue().getPosition() instanceof FinishedPosition))
- .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition().toString()));
+ .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition().toString()));
}
/**
@@ -75,11 +76,11 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
}
Map<String, InventoryTaskProgress> taskProgressMap = new LinkedHashMap<>();
taskProgressMap.putAll(Arrays.stream(yamlProgress.getFinished()).collect(Collectors.toMap(key -> key, value -> new InventoryTaskProgress(new FinishedPosition()))));
- taskProgressMap.putAll(yamlProgress.getUnfinished().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, getInventoryTaskProgressFunction())));
+ taskProgressMap.putAll(yamlProgress.getUnfinished().entrySet().stream().collect(Collectors.toMap(Entry::getKey, getInventoryTaskProgressFunction())));
return new JobItemInventoryTasksProgress(taskProgressMap);
}
- private Function<Map.Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
+ private Function<Entry<String, String>, InventoryTaskProgress> getInventoryTaskProgressFunction() {
return entry -> new InventoryTaskProgress(
Strings.isNullOrEmpty(entry.getValue()) ? new PlaceholderPosition() : PrimaryKeyPositionFactory.newInstance(entry.getValue()));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 326d376f561..49c2e98ac5c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -156,12 +155,9 @@ public final class InventoryTaskSplitter {
}
int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
- return getPositionByIntegerPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
+ return getPositionByIntegerUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
}
- if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
- return getPositionByStringPrimaryKeyRange(jobItemContext, dataSource, dumperConfig);
- }
- throw new SplitPipelineJobByRangeException(dumperConfig.getActualTableName(), "primary key is not integer or string type");
+ return getPositionByStringUniqueKeyRange(jobItemContext, dataSource, dumperConfig);
}
private Collection<IngestPosition<?>> getPositionWithoutUniqueKey(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
@@ -189,8 +185,8 @@ public final class InventoryTaskSplitter {
}
}
- private Collection<IngestPosition<?>> getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
- final InventoryDumperConfiguration dumperConfig) {
+ private Collection<IngestPosition<?>> getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+ final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new LinkedList<>();
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String sql = TypedSPILoader.getService(PipelineSQLBuilder.class, jobConfig.getSourceDatabaseType())
@@ -229,8 +225,8 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
- final InventoryDumperConfiguration dumperConfig) {
+ private Collection<IngestPosition<?>> getPositionByStringUniqueKeyRange(final InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
+ final InventoryDumperConfiguration dumperConfig) {
long tableRecordsCount = getTableRecordsCount(jobItemContext, dataSource, dumperConfig);
jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
Collection<IngestPosition<?>> result = new LinkedList<>();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 8928e401973..ac3455e9b74 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -71,17 +71,17 @@ public abstract class AbstractPipelineSQLBuilder implements PipelineSQLBuilder {
protected abstract String getRightIdentifierQuoteString();
@Override
- public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+ public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT * FROM %s WHERE %s%s? AND %s<=? ORDER BY %s ASC LIMIT ?", qualifiedTableName, quotedUniqueKey, firstQuery ? ">=" : ">", quotedUniqueKey, quotedUniqueKey);
+ return String.format("SELECT * FROM %s WHERE %s>=? AND %s<=? ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey, quotedUniqueKey, quotedUniqueKey);
}
@Override
- public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+ public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(uniqueKey);
- return String.format("SELECT * FROM %s WHERE %s%s? ORDER BY %s ASC LIMIT ?", qualifiedTableName, quotedUniqueKey, firstQuery ? ">=" : ">", quotedUniqueKey);
+ return String.format("SELECT * FROM %s ORDER BY %s ASC", qualifiedTableName, quotedUniqueKey);
}
protected final String getQualifiedTableName(final String schemaName, final String tableName) {
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 3db2baad3f5..08e46bfa21e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -29,12 +29,12 @@ import java.util.Optional;
public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
@Override
- public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+ public String buildDivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
return "";
}
@Override
- public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType, final boolean firstQuery) {
+ public String buildIndivisibleInventoryDumpSQL(final String schemaName, final String tableName, final String uniqueKey, final int uniqueKeyDataType) {
return "";
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index cf94daf6a76..a62c9cb83e9 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckIgnoredType;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
@@ -80,11 +79,6 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
progressContext.getTableNames().add(jobConfig.getSourceTableName());
Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
- if (null == jobConfig.getUniqueKeyColumn()) {
- progressContext.getIgnoredTableNames().add(sourceTable.getTableName().getOriginal());
- result.put(sourceTable.getTableName().getOriginal(), new DataConsistencyCheckResult(DataConsistencyCheckIgnoredType.NO_UNIQUE_KEY));
- return result;
- }
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 311d40e4337..13c1a671b82 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -114,7 +114,6 @@ public abstract class PipelineBaseE2EIT {
containerComposer = ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER
? new DockerContainerComposer(testParam.getDatabaseType(), testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
: new NativeContainerComposer(testParam.getDatabaseType());
- containerComposer.start();
if (ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER) {
DockerStorageContainer storageContainer = ((DockerContainerComposer) containerComposer).getStorageContainers().get(0);
username = storageContainer.getUsername();
@@ -357,7 +356,6 @@ public abstract class PipelineBaseE2EIT {
}
protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) throws SQLException {
- proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
String countSQL = Strings.isNullOrEmpty(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
log.info("actual count {}", actual.get("count"));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index bc01c64ba87..540ee00ad0e 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -108,6 +108,7 @@ public final class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
List<String> lastJobIds = listJobId();
assertThat(lastJobIds.size(), is(0));
+ proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT, "");
log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d1b44b688a1..d9fd0aa96fe 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -101,6 +101,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
}
List<String> lastJobIds = listJobId();
assertThat(lastJobIds.size(), is(0));
+ proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
assertGreaterThanOrderTableInitRows(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1, PipelineBaseE2EIT.SCHEMA_NAME);
log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
similarity index 63%
rename from test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
rename to test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index c3f5489d48d..56145d8ec51 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NoUniqueKeyMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,12 +20,13 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
-import org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -42,13 +43,18 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
+/**
+ * E2E IT for different types of indexes, includes:
+ * 1) no unique key.
+ * 2) special type single column unique key.
+ */
@RunWith(Parameterized.class)
@Slf4j
-public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
+public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
private final PipelineTestParameter testParam;
- public NoUniqueKeyMigrationE2EIT(final PipelineTestParameter testParam) {
+ public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
this.testParam = testParam;
}
@@ -60,7 +66,7 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
return result;
}
for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
- result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/none_primary_key/mysql.xml"));
+ result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/common/none.xml"));
}
return result;
}
@@ -71,12 +77,38 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
}
@Test
- public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException {
- log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+ public void assertNoUniqueKeyMigrationSuccess() throws SQLException, InterruptedException {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (getDatabaseType() instanceof MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ // DATA_MATCH doesn't supported, could not order by records
+ consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else {
+ return;
+ }
+ assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ }
+
+ @Test
+ public void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess() throws SQLException, InterruptedException {
+ String sql;
+ String consistencyCheckAlgorithmType;
+ if (getDatabaseType() instanceof MySQLDatabaseType) {
+ sql = "CREATE TABLE `%s` (`order_id` VARBINARY(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ // DATA_MATCH doesn't supported: Order by value must implements Comparable
+ consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else {
+ return;
+ }
+ assertMigrationSuccess(sql, consistencyCheckAlgorithmType);
+ }
+
+ private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
- createSourceOrderTable();
+ createSourceOrderTable(sqlPattern);
try (Connection connection = getSourceDataSource().getConnection()) {
- AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
+ KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
}
addMigrationProcessConfig();
@@ -86,12 +118,16 @@ public final class NoUniqueKeyMigrationE2EIT extends AbstractMigrationE2EIT {
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ commitMigrationByJobId(jobId);
proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertTargetAndSourceCountAreSame();
- commitMigrationByJobId(jobId);
List<String> lastJobIds = listJobId();
assertThat(lastJobIds.size(), is(0));
- log.info("{} E2E IT finished, database type={}, docker image={}", this.getClass().getName(), testParam.getDatabaseType(), testParam.getStorageContainerImage());
+ }
+
+ private void createSourceOrderTable(final String sqlPattern) throws SQLException {
+ sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
}
private void assertTargetAndSourceCountAreSame() {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
index 4321940f414..e37d8c84510 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/container/compose/NativeContainerComposer.java
@@ -47,11 +47,6 @@ public final class NativeContainerComposer extends BaseContainerComposer {
this.databaseType = databaseType;
}
- @Override
- public void start() {
- // do nothing
- }
-
@SneakyThrows(SQLException.class)
@Override
public void cleanUpDatabase(final String databaseName) {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
index e6ec743e71f..5425b9ee7e6 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/param/PipelineTestParameter.java
@@ -31,6 +31,7 @@ public final class PipelineTestParameter {
private final String storageContainerImage;
+ // TODO It's not scenario. Remove it later
private final String scenario;
private final int storageContainerCount;
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
index fa765e17ccb..156aaf59385 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/watcher/PipelineWatcher.java
@@ -80,8 +80,15 @@ public class PipelineWatcher extends TestWatcher {
}
}
+ @Override
+ protected void starting(final Description description) {
+ log.info("starting: {}", description);
+ containerComposer.start();
+ }
+
@Override
protected void finished(final Description description) {
+ log.info("finished: {}", description);
containerComposer.close();
}
}
diff --git a/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml b/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
index cc0a0de9a50..4592ef89ff3 100644
--- a/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
+++ b/test/e2e/pipeline/src/test/resources/env/common/migration-command.xml
@@ -67,8 +67,7 @@
CREATE SHARDING TABLE RULE t_order(
STORAGE_UNITS(ds_2,ds_3,ds_4),
SHARDING_COLUMN=user_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+ TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6"))
);
</create-target-order-table-rule>
@@ -76,8 +75,7 @@
CREATE SHARDING TABLE RULE t_order_item(
DATANODES('ds_${2..4}.t_order_item_${0..1}'),
DATABASE_STRATEGY(TYPE='standard',SHARDING_COLUMN=user_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='ds_${user_id % 3 + 2}')))),
- TABLE_STRATEGY(TYPE='standard',SHARDING_COLUMN=order_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='t_order_item_${order_id % 2}')))),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME='snowflake'))
+ TABLE_STRATEGY(TYPE='standard',SHARDING_COLUMN=order_id,SHARDING_ALGORITHM(TYPE(NAME='inline',PROPERTIES('algorithm-expression'='t_order_item_${order_id % 2}'))))
);
</create-target-order-item-table-rule>
diff --git a/test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml b/test/e2e/pipeline/src/test/resources/env/common/none.xml
similarity index 78%
rename from test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml
rename to test/e2e/pipeline/src/test/resources/env/common/none.xml
index 58434662c39..ce00c632cb3 100644
--- a/test/e2e/pipeline/src/test/resources/env/scenario/primary_key/none_primary_key/mysql.xml
+++ b/test/e2e/pipeline/src/test/resources/env/common/none.xml
@@ -14,12 +14,6 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
+<!-- TODO remove it later -->
<command>
- <create-table-order>
- CREATE TABLE `%s` (
- `order_id` INT NOT NULL,
- `user_id` INT NOT NULL,
- `status` varchar(255) NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- </create-table-order>
</command>
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
similarity index 50%
copy from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
copy to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
index 911842bf9b2..2adec288d17 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NoUniqueKeyPosition.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/NoUniqueKeyPositionTest.java
@@ -15,38 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.ingest.position;
+package org.apache.shardingsphere.test.it.data.pipeline.core.ingest.position;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
+import org.junit.Test;
-/**
- * No unique key position.
- */
-@RequiredArgsConstructor
-public final class NoUniqueKeyPosition extends PrimaryKeyPosition<Void> implements IngestPosition<NoUniqueKeyPosition> {
-
- @Override
- public Void getBeginValue() {
- return null;
- }
-
- @Override
- public Void getEndValue() {
- return null;
- }
-
- @Override
- protected Void convert(final String value) {
- throw new UnsupportedOperationException();
- }
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class NoUniqueKeyPositionTest {
- @Override
- protected char getType() {
- return 'n';
+ @Test
+ public void assertInit() {
+ NoUniqueKeyPosition position = (NoUniqueKeyPosition) PrimaryKeyPositionFactory.newInstance("n,,");
+ assertNull(position.getBeginValue());
+ assertNull(position.getEndValue());
}
- @Override
- public int compareTo(final NoUniqueKeyPosition position) {
- return 0;
+ @Test
+ public void assertToString() {
+ assertThat(new NoUniqueKeyPosition().toString(), is("n,,"));
}
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java
new file mode 100644
index 00000000000..01073a68677
--- /dev/null
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/ingest/position/UnsupportedKeyPositionTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.it.data.pipeline.core.ingest.position;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPositionFactory;
+import org.apache.shardingsphere.data.pipeline.api.ingest.position.UnsupportedKeyPosition;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class UnsupportedKeyPositionTest {
+
+ @Test
+ public void assertInit() {
+ UnsupportedKeyPosition position = (UnsupportedKeyPosition) PrimaryKeyPositionFactory.newInstance("u,,");
+ assertNull(position.getBeginValue());
+ assertNull(position.getEndValue());
+ }
+
+ @Test
+ public void assertToString() {
+ assertThat(new UnsupportedKeyPosition().toString(), is("u,,"));
+ }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 52b48485a2b..58f4796b3f1 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
@@ -122,15 +121,16 @@ public final class InventoryTaskSplitterTest {
assertThat(actual.size(), is(1));
}
- @Test(expected = SplitPipelineJobByRangeException.class)
- public void assertSplitInventoryDataWithIllegalKeyDataType() throws SQLException, ReflectiveOperationException {
+ @Test
+ public void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException, ReflectiveOperationException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
InventoryDumperConfiguration dumperConfig = (InventoryDumperConfiguration) Plugins.getMemberAccessor()
.get(InventoryTaskSplitter.class.getDeclaredField("dumperConfig"), inventoryTaskSplitter);
assertNotNull(dumperConfig);
dumperConfig.setUniqueKey("order_id,user_id");
dumperConfig.setUniqueKeyDataType(Integer.MIN_VALUE);
- inventoryTaskSplitter.splitInventoryData(jobItemContext);
+ List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobItemContext);
+ assertThat(actual.size(), is(1));
}
@Test