You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/22 07:31:12 UTC
[shardingsphere] branch master updated: Refactor migration job and related impl with standalone source resource (#20316)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 203606c877f Refactor migration job and related impl with standalone source resource (#20316)
203606c877f is described below
commit 203606c877ff5f65620788dac48ee27df25fe6b3
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Aug 22 15:31:03 2022 +0800
Refactor migration job and related impl with standalone source resource (#20316)
---
...hardingRuleAlteredJobConfigurationPreparer.java | 255 --------------------
.../handler/update/MigrateTableUpdater.java | 14 +-
.../distsql/statement/MigrateTableStatement.java | 2 +
.../ShardingSpherePipelineDataSourceCreator.java | 3 +-
.../data/pipeline/api/MigrationJobPublicAPI.java | 8 +
.../pipeline/api/config/ImporterConfiguration.java | 3 +-
.../api/config/ingest/DumperConfiguration.java | 2 +-
.../ingest/InventoryDumperConfiguration.java | 1 -
.../api/config/job/MigrationJobConfiguration.java | 38 +--
.../api/config/job/PipelineJobConfiguration.java | 2 +-
.../job/yaml/YamlMigrationJobConfiguration.java | 26 +--
.../yaml/YamlMigrationJobConfigurationSwapper.java | 14 +-
.../job/yaml/YamlPipelineJobConfiguration.java | 2 +-
.../api/pojo/CreateMigrationJobParameter.java} | 24 +-
.../spi/ingest/position/PositionInitializer.java | 6 +-
.../RuleAlteredJobConfigurationPreparer.java | 47 ----
...RuleAlteredJobConfigurationPreparerFactory.java | 44 ----
.../api/config/MigrationJobConfigurationTest.java | 15 +-
...RuleAlteredJobConfigurationPreparerFixture.java | 36 ---
.../check/consistency/DataConsistencyChecker.java | 12 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 4 +
.../pipeline/core/execute/PipelineJobWorker.java | 5 -
.../core/prepare/PipelineJobPreparerUtils.java | 9 +-
.../datasource/AbstractDataSourcePreparer.java | 1 +
.../data/pipeline/core/util/SchemaTableUtil.java | 93 ++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 172 ++++++++++----
.../scenario/migration/MigrationJobId.java | 7 +-
.../scenario/migration/MigrationJobPreparer.java | 33 +--
.../scenario/rulealtered/RuleAlteredJobWorker.java | 246 --------------------
.../core/fixture/MigrationJobAPIFixture.java | 5 +
.../fixture/FixturePositionInitializer.java | 2 +-
.../pipeline/core/job/PipelineJobIdUtilsTest.java | 5 +-
.../mysql/ingest/MySQLPositionInitializer.java | 2 +-
.../mysql/ingest/MySQLPositionInitializerTest.java | 2 +-
.../datasource/MySQLDataSourcePreparerTest.java | 3 -
.../ingest/OpenGaussPositionInitializer.java | 27 +--
.../opengauss/ingest/OpenGaussWalDumper.java | 2 +-
.../ingest/PostgreSQLPositionInitializer.java | 17 +-
.../postgresql/ingest/PostgreSQLWalDumper.java | 3 +-
.../ingest/PostgreSQLPositionInitializerTest.java | 12 +-
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 22 +-
.../rdl/rule/RuleDefinitionBackendHandler.java | 8 +-
.../pipeline/cases/base/BaseExtraSQLITCase.java | 39 +---
.../data/pipeline/cases/base/BaseITCase.java | 257 ++++++++++-----------
...QLCommand.java => MigrationDistSQLCommand.java} | 38 ++-
...ScalingIT.java => MySQLMigrationGeneralIT.java} | 59 ++---
...ngIT.java => PostgreSQLMigrationGeneralIT.java} | 63 ++---
...alingIT.java => TextPrimaryKeyMigrationIT.java} | 49 ++--
.../pipeline/cases/task/MySQLIncrementTask.java | 2 +-
.../cases/task/PostgreSQLIncrementTask.java | 16 +-
...tainer.java => MigrationComposedContainer.java} | 4 +-
.../pipeline/framework/watcher/ScalingWatcher.java | 14 +-
.../src/test/resources/env/common/command.xml | 103 +++------
.../src/test/resources/logback-test.xml | 1 +
.../core/api/impl/MigrationJobAPIImplTest.java | 13 +-
.../consistency/DataConsistencyCheckerTest.java | 4 +-
.../core/fixture/FixturePositionInitializer.java | 2 +-
.../pipeline/core/task/IncrementalTaskTest.java | 2 +-
.../core/util/JobConfigurationBuilder.java | 19 +-
.../migration_sharding_sphere_jdbc_target.yaml | 57 +++++
.../resources/migration_standard_jdbc_source.yaml | 29 +++
61 files changed, 766 insertions(+), 1239 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
deleted file mode 100644
index e611ae7c40f..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sharding.data.pipeline;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.ComplexShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.ShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-/**
- * Sharding rule altered job configuration preparer.
- */
-@Slf4j
-public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
-
- @Override
- public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
- Map<String, List<DataNode>> actualDataNodes = getActualDataNodes(new YamlMigrationJobConfigurationSwapper().swapToObject(yamlJobConfig));
- yamlJobConfig.setJobShardingDataNodes(getJobShardingDataNodes(actualDataNodes));
- yamlJobConfig.setLogicTables(getLogicTables(actualDataNodes.keySet()));
- yamlJobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(actualDataNodes));
- yamlJobConfig.setSchemaTablesMap(getSchemaTablesMap(yamlJobConfig.getDatabaseName(), actualDataNodes.keySet()));
- }
-
- private static Map<String, List<DataNode>> getActualDataNodes(final MigrationJobConfiguration jobConfig) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
- ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
- ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
- // TODO InstanceContext should not null
- ShardingRule shardingRule = new ShardingRule(sourceRuleConfig, source.getRootConfig().getDataSources().keySet(), null);
- Map<String, TableRule> tableRules = shardingRule.getTableRules();
- Map<String, List<DataNode>> result = new LinkedHashMap<>();
- Set<String> reShardNeededTables = new HashSet<>(jobConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
- for (Entry<String, TableRule> entry : tableRules.entrySet()) {
- if (reShardNeededTables.contains(entry.getKey())) {
- result.put(entry.getKey(), entry.getValue().getActualDataNodes());
- }
- }
- return result;
- }
-
- private List<String> getJobShardingDataNodes(final Map<String, List<DataNode>> actualDataNodes) {
- List<String> result = new LinkedList<>();
- Map<String, Map<String, List<DataNode>>> groupedDataSourceDataNodesMap = groupDataSourceDataNodesMapByDataSourceName(actualDataNodes);
- for (Map<String, List<DataNode>> each : groupedDataSourceDataNodesMap.values()) {
- List<JobDataNodeEntry> dataNodeEntries = new ArrayList<>(each.size());
- for (Entry<String, List<DataNode>> entry : each.entrySet()) {
- dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), entry.getValue()));
- }
- result.add(new JobDataNodeLine(dataNodeEntries).marshal());
- }
- return result;
- }
-
- private Map<String, Map<String, List<DataNode>>> groupDataSourceDataNodesMapByDataSourceName(final Map<String, List<DataNode>> actualDataNodes) {
- Map<String, Map<String, List<DataNode>>> result = new LinkedHashMap<>();
- for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) {
- for (DataNode each : entry.getValue()) {
- Map<String, List<DataNode>> groupedDataNodesMap = result.computeIfAbsent(each.getDataSourceName(), key -> new LinkedHashMap<>());
- groupedDataNodesMap.computeIfAbsent(entry.getKey(), key -> new LinkedList<>()).add(each);
- }
- }
- return result;
- }
-
- private static String getLogicTables(final Set<String> logicTables) {
- return String.join(",", logicTables);
- }
-
- private static String getTablesFirstDataNodes(final Map<String, List<DataNode>> actualDataNodes) {
- List<JobDataNodeEntry> dataNodeEntries = new ArrayList<>(actualDataNodes.size());
- for (Entry<String, List<DataNode>> entry : actualDataNodes.entrySet()) {
- dataNodeEntries.add(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)));
- }
- return new JobDataNodeLine(dataNodeEntries).marshal();
- }
-
- private static Map<String, List<String>> getSchemaTablesMap(final String databaseName, final Set<String> logicTables) {
- // TODO get by search_path
- ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
- Map<String, List<String>> result = new LinkedHashMap<>();
- database.getSchemas().forEach((schemaName, schema) -> {
- for (String each : schema.getAllTableNames()) {
- if (!logicTables.contains(each)) {
- continue;
- }
- result.computeIfAbsent(schemaName, unused -> new LinkedList<>()).add(each);
- }
- });
- log.info("getSchemaTablesMap, result={}", result);
- return result;
- }
-
- @Override
- public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
- ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
- ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
- Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
- JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- String dataSourceName = dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
- Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
- for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
- for (DataNode dataNode : each.getDataNodes()) {
- tableNameMap.put(new ActualTableName(dataNode.getTableName()), new LogicTableName(each.getLogicTableName()));
- }
- }
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
- DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getDatabaseName(), dataSourceName,
- dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableNameMap, tableNameSchemaNameMapping);
- Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfiguration(jobConfig);
- Set<LogicTableName> reShardNeededTables = jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
- Map<LogicTableName, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig), reShardNeededTables);
- ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, tableNameSchemaNameMapping);
- TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
- log.info("createTaskConfiguration, dataSourceName={}, result={}", dataSourceName, result);
- return result;
- }
-
- private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final MigrationJobConfiguration jobConfig) {
- return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
- }
-
- private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final MigrationJobConfiguration jobConfig) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- if (!(targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration)) {
- return Optional.empty();
- }
- ShardingSpherePipelineDataSourceConfiguration target = (ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig;
- return Optional.of(ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(target.getRootConfig().getRules()));
- }
-
- private static Map<LogicTableName, Set<String>> getShardingColumnsMap(final ShardingRuleConfiguration shardingRuleConfig, final Set<LogicTableName> reShardNeededTables) {
- Set<String> defaultDatabaseShardingColumns = extractShardingColumns(shardingRuleConfig.getDefaultDatabaseShardingStrategy());
- Set<String> defaultTableShardingColumns = extractShardingColumns(shardingRuleConfig.getDefaultTableShardingStrategy());
- Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
- for (ShardingTableRuleConfiguration each : shardingRuleConfig.getTables()) {
- LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
- if (!reShardNeededTables.contains(logicTableName)) {
- continue;
- }
- Set<String> shardingColumns = new HashSet<>();
- shardingColumns.addAll(null == each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns : extractShardingColumns(each.getDatabaseShardingStrategy()));
- shardingColumns.addAll(null == each.getTableShardingStrategy() ? defaultTableShardingColumns : extractShardingColumns(each.getTableShardingStrategy()));
- result.put(logicTableName, shardingColumns);
- }
- for (ShardingAutoTableRuleConfiguration each : shardingRuleConfig.getAutoTables()) {
- LogicTableName logicTableName = new LogicTableName(each.getLogicTable());
- if (!reShardNeededTables.contains(logicTableName)) {
- continue;
- }
- ShardingStrategyConfiguration shardingStrategy = each.getShardingStrategy();
- Set<String> shardingColumns = new HashSet<>(extractShardingColumns(shardingStrategy));
- result.put(logicTableName, shardingColumns);
- }
- return result;
- }
-
- private static Set<String> extractShardingColumns(final ShardingStrategyConfiguration shardingStrategy) {
- if (shardingStrategy instanceof StandardShardingStrategyConfiguration) {
- return new HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration) shardingStrategy).getShardingColumn()));
- }
- if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
- return new HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration) shardingStrategy).getShardingColumns().split(",")));
- }
- return Collections.emptySet();
- }
-
- private static DumperConfiguration createDumperConfiguration(final String databaseName, final String dataSourceName, final Map<String, Object> props,
- final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- DumperConfiguration result = new DumperConfiguration();
- result.setDatabaseName(databaseName);
- result.setDataSourceName(dataSourceName);
- result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(YamlEngine.marshal(props)));
- result.setTableNameMap(tableNameMap);
- result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
- return result;
- }
-
- private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
- final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- int retryTimes = jobConfig.getRetryTimes();
- int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
- }
-
- private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
- Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
- for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
- result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
- }
- return Collections.unmodifiableMap(result);
- }
-}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index b90a65aa39d..45771d65713 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -17,22 +17,32 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
+import org.apache.shardingsphere.sharding.yaml.swapper.YamlShardingRuleConfigurationSwapper;
/**
* Migrate table updater.
*/
+@Slf4j
public final class MigrateTableUpdater implements RALUpdater<MigrateTableStatement> {
private static final MigrationJobPublicAPI JOB_API = PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
+ private static final YamlShardingRuleConfigurationSwapper SHARDING_RULE_CONFIG_SWAPPER = new YamlShardingRuleConfigurationSwapper();
+
@Override
public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) {
- // TODO implement migrate table
- JOB_API.getType();
+ log.info("start migrate job by {}", sqlStatement);
+ String targetDatabaseName = ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), databaseName);
+ CreateMigrationJobParameter createMigrationJobParameter = new CreateMigrationJobParameter(sqlStatement.getSourceDatabaseName(), sqlStatement.getSourceTableName(),
+ targetDatabaseName, sqlStatement.getTargetTableName());
+ JOB_API.createJobAndStart(createMigrationJobParameter);
}
@Override
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
index 56d4c5bae8c..c03314798e1 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/MigrateTableStatement.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.migration.distsql.statement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableScalingRALStatement;
/**
@@ -26,6 +27,7 @@ import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.UpdatableS
*/
@RequiredArgsConstructor
@Getter
+@ToString
public final class MigrateTableStatement extends UpdatableScalingRALStatement {
private final String sourceDatabaseName;
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 908b5460d2f..c3ecd5cecc2 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -44,11 +44,10 @@ public final class ShardingSpherePipelineDataSourceCreator implements PipelineDa
YamlRootConfiguration rootConfig = (YamlRootConfiguration) pipelineDataSourceConfig;
YamlShardingRuleConfiguration shardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
enableRangeQueryForInline(shardingRuleConfig);
- String databaseName = null;
Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(), false);
Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
try {
- return ShardingSphereDataSourceFactory.createDataSource(databaseName, dataSourceMap, ruleConfigs, null);
+ return ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), dataSourceMap, ruleConfigs, null);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index a31bd8aa037..a1f4045dced 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
@@ -126,4 +127,11 @@ public interface MigrationJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI
* @param resourceNames resource names
*/
void dropMigrationSourceResources(Collection<String> resourceNames);
+
+ /**
+ * Create job migration config and start.
+ *
+ * @param parameter create migration job parameter
+ */
+ void createJobAndStart(CreateMigrationJobParameter parameter);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
index a5dbba84466..d451719ecf2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ImporterConfiguration.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -71,7 +72,7 @@ public final class ImporterConfiguration {
* @return sharding columns
*/
public Set<String> getShardingColumns(final String logicTableName) {
- return shardingColumnsMap.get(new LogicTableName(logicTableName));
+ return ObjectUtils.defaultIfNull(shardingColumnsMap.get(new LogicTableName(logicTableName)), Collections.emptySet());
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 20e32724c98..e47aa2be450 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -38,7 +38,7 @@ import java.util.Map;
// TODO fields final
public class DumperConfiguration {
- private String databaseName;
+ private String jobId;
private String dataSourceName;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index c1f33e2cc54..9340b0e7208 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -46,7 +46,6 @@ public final class InventoryDumperConfiguration extends DumperConfiguration {
private JobRateLimitAlgorithm rateLimitAlgorithm;
public InventoryDumperConfiguration(final DumperConfiguration dumperConfig) {
- setDatabaseName(dumperConfig.getDatabaseName());
setDataSourceName(dumperConfig.getDataSourceName());
setDataSourceConfig(dumperConfig.getDataSourceConfig());
setTableNameMap(dumperConfig.getTableNameMap());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
index d3d77cb0601..e55cb062886 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/MigrationJobConfiguration.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.api.config.job;
-import com.google.common.base.Splitter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -32,15 +32,14 @@ import java.util.Map;
@RequiredArgsConstructor
@Getter
@Slf4j
+@ToString(exclude = {"source", "target", "schemaTablesMap"})
public final class MigrationJobConfiguration implements PipelineJobConfiguration {
private final String jobId;
- private final String databaseName;
+ private final String targetDatabaseName;
- private final Integer activeVersion;
-
- private final Integer newVersion;
+ private final String sourceDataSourceName;
private final String sourceDatabaseType;
@@ -50,17 +49,14 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
private final PipelineDataSourceConfiguration target;
- /**
- * Map{altered rule yaml class name, re-shard needed table names}.
- */
- private final Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-
/**
* Map{schema name, logic table names}.
*/
private final Map<String, List<String>> schemaTablesMap;
- private final String logicTables;
+ private final String sourceTableName;
+
+ private final String targetTableName;
/**
* Collection of each logic table's first data node.
@@ -83,24 +79,6 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
* @return job sharding count
*/
public int getJobShardingCount() {
- return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
- }
-
- /**
- * Split {@linkplain #logicTables} to logic table names.
- *
- * @return logic table names
- */
- public List<String> splitLogicTableNames() {
- return Splitter.on(',').splitToList(logicTables);
- }
-
- @Override
- public String toString() {
- return "MigrationJobConfiguration{"
- + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
- + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
- + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
- + '}';
+ return 1;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index f1f97320b0c..336c996e0eb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -34,7 +34,7 @@ public interface PipelineJobConfiguration {
*
* @return database name
*/
- String getDatabaseName();
+ String getTargetDatabaseName();
/**
* Get job sharding count.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
index 3971d8b7087..9d17d6817c6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.api.config.job.yaml;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -32,15 +33,14 @@ import java.util.Map;
@Getter
@Setter
@Slf4j
+@ToString(exclude = {"source", "target", "schemaTablesMap"})
public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfiguration {
private String jobId;
- private String databaseName;
+ private String targetDatabaseName;
- private Integer activeVersion;
-
- private Integer newVersion;
+ private String sourceDataSourceName;
private String sourceDatabaseType;
@@ -50,17 +50,14 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
private YamlPipelineDataSourceConfiguration target;
- /**
- * Map{altered rule yaml class name, re-shard needed table names}.
- */
- private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-
/**
* Map{schema name, logic table names}.
*/
private Map<String, List<String>> schemaTablesMap;
- private String logicTables;
+ private String sourceTableName;
+
+ private String targetTableName;
/**
* Collection of each logic table's first data node.
@@ -102,13 +99,4 @@ public final class YamlMigrationJobConfiguration implements YamlPipelineJobConfi
Preconditions.checkNotNull(yamlConfig.getType());
Preconditions.checkNotNull(yamlConfig.getParameter());
}
-
- @Override
- public String toString() {
- return "YamlMigrationJobConfiguration{"
- + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
- + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion
- + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
- + '}';
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
index 9c28e438ccd..31c7e95f356 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlMigrationJobConfigurationSwapper.java
@@ -36,16 +36,15 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
public YamlMigrationJobConfiguration swapToYamlConfiguration(final MigrationJobConfiguration data) {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
result.setJobId(data.getJobId());
- result.setDatabaseName(data.getDatabaseName());
- result.setActiveVersion(data.getActiveVersion());
- result.setNewVersion(data.getNewVersion());
+ result.setTargetDatabaseName(data.getTargetDatabaseName());
result.setSourceDatabaseType(data.getSourceDatabaseType());
+ result.setSourceTableName(data.getSourceTableName());
+ result.setSourceDataSourceName(data.getSourceDataSourceName());
result.setTargetDatabaseType(data.getTargetDatabaseType());
result.setSource(dataSourceConfigSwapper.swapToYamlConfiguration(data.getSource()));
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
- result.setAlteredRuleYamlClassNameTablesMap(data.getAlteredRuleYamlClassNameTablesMap());
result.setSchemaTablesMap(data.getSchemaTablesMap());
- result.setLogicTables(data.getLogicTables());
+ result.setTargetTableName(data.getTargetTableName());
result.setTablesFirstDataNodes(data.getTablesFirstDataNodes());
result.setJobShardingDataNodes(data.getJobShardingDataNodes());
result.setConcurrency(data.getConcurrency());
@@ -55,11 +54,10 @@ public final class YamlMigrationJobConfigurationSwapper implements YamlConfigura
@Override
public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguration yamlConfig) {
- return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getDatabaseName(),
- yamlConfig.getActiveVersion(), yamlConfig.getNewVersion(),
+ return new MigrationJobConfiguration(yamlConfig.getJobId(), yamlConfig.getTargetDatabaseName(), yamlConfig.getSourceDataSourceName(),
yamlConfig.getSourceDatabaseType(), yamlConfig.getTargetDatabaseType(),
dataSourceConfigSwapper.swapToObject(yamlConfig.getSource()), dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),
- yamlConfig.getAlteredRuleYamlClassNameTablesMap(), yamlConfig.getSchemaTablesMap(), yamlConfig.getLogicTables(),
+ yamlConfig.getSchemaTablesMap(), yamlConfig.getSourceTableName(), yamlConfig.getTargetTableName(),
yamlConfig.getTablesFirstDataNodes(), yamlConfig.getJobShardingDataNodes(),
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
index c0ab0ca640b..89b0c9716dd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlPipelineJobConfiguration.java
@@ -36,5 +36,5 @@ public interface YamlPipelineJobConfiguration extends YamlConfiguration {
*
* @return database name
*/
- String getDatabaseName();
+ String getTargetDatabaseName();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
similarity index 59%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
index 5fdccecd524..b7183d87668 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactoryTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/CreateMigrationJobParameter.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
-import org.apache.shardingsphere.data.pipeline.spi.fixture.RuleAlteredJobConfigurationPreparerFixture;
-import org.junit.Test;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-
-public final class RuleAlteredJobConfigurationPreparerFactoryTest {
+@Data
+@RequiredArgsConstructor
+public final class CreateMigrationJobParameter {
+
+ private final String sourceResourceName;
+
+ private final String sourceTableName;
+
+ private final String targetDatabaseName;
- @Test
- public void assertGetInstance() {
- assertThat(RuleAlteredJobConfigurationPreparerFactory.getInstance(), instanceOf(RuleAlteredJobConfigurationPreparerFixture.class));
- }
+ private final String targetTableName;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
index 05b9a390718..9ec5cf0d1c9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
@@ -34,10 +34,11 @@ public interface PositionInitializer extends TypedSPI {
* Init position by data source.
*
* @param dataSource data source
+ * @param slotNameSuffix slot name suffix
* @return position
* @throws SQLException SQL exception
*/
- IngestPosition<?> init(DataSource dataSource) throws SQLException;
+ IngestPosition<?> init(DataSource dataSource, String slotNameSuffix) throws SQLException;
/**
* Init position by string data.
@@ -51,8 +52,9 @@ public interface PositionInitializer extends TypedSPI {
* Clean up by data source if necessary.
*
* @param dataSource data source
+ * @param slotNameSuffix slot name suffix
* @throws SQLException SQL exception
*/
- default void destroy(DataSource dataSource) throws SQLException {
+ default void destroy(DataSource dataSource, final String slotNameSuffix) throws SQLException {
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
deleted file mode 100644
index 2137bfd6f82..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-
-/**
- * Rule altered job configuration preparer.
- */
-public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
-
- /**
- * Extend job configuration.
- *
- * @param yamlJobConfig YAML job configuration
- */
- void extendJobConfiguration(YamlMigrationJobConfiguration yamlJobConfig);
-
- /**
- * Create task configuration, used by underlying scheduler.
- *
- * @param jobConfig job configuration
- * @param jobShardingItem job sharding item
- * @param pipelineProcessConfig pipeline process configuration
- * @return task configuration
- */
- TaskConfiguration createTaskConfiguration(MigrationJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java
deleted file mode 100644
index 6d26d29fd48..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparerFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
-
-/**
- * Rule altered job configuration preparer factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RuleAlteredJobConfigurationPreparerFactory {
-
- static {
- ShardingSphereServiceLoader.register(RuleAlteredJobConfigurationPreparer.class);
- }
-
- /**
- * Get instance of rule altered job configuration preparer.
- *
- * @return got instance
- */
- // TODO RuleAlteredJobConfigurationPreparer should be TypedSPI or OrderedSPI
- public static RuleAlteredJobConfigurationPreparer getInstance() {
- return RequiredSPIRegistry.getRegisteredService(RuleAlteredJobConfigurationPreparer.class);
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
index 504ce956e8a..aaac7418886 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/MigrationJobConfigurationTest.java
@@ -22,8 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.junit.Test;
-import java.util.Arrays;
-
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -35,22 +33,13 @@ public final class MigrationJobConfigurationTest {
public void assertGetJobShardingCountByNull() {
YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.getJobShardingCount(), is(0));
+ assertThat(jobConfig.getJobShardingCount(), is(1));
}
@Test
public void assertGetJobShardingCount() {
YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
- yamlJobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
- MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.getJobShardingCount(), is(2));
- }
-
- @Test
- public void assertSplitLogicTableNames() {
- YamlMigrationJobConfiguration yamlJobConfig = new YamlMigrationJobConfiguration();
- yamlJobConfig.setLogicTables("foo_tbl,bar_tbl");
MigrationJobConfiguration jobConfig = JOB_CONFIG_SWAPPER.swapToObject(yamlJobConfig);
- assertThat(jobConfig.splitLogicTableNames(), is(Arrays.asList("foo_tbl", "bar_tbl")));
+ assertThat(jobConfig.getJobShardingCount(), is(1));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
deleted file mode 100644
index ab73dbdbefe..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.fixture;
-
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-
-public final class RuleAlteredJobConfigurationPreparerFixture implements RuleAlteredJobConfigurationPreparer {
-
- @Override
- public void extendJobConfiguration(final YamlMigrationJobConfiguration yamlJobConfig) {
- }
-
- @Override
- public TaskConfiguration createTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
- return null;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 98987fcc9f5..50edd3ca848 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
@@ -32,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -51,6 +53,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -64,6 +67,7 @@ import java.util.concurrent.TimeUnit;
/**
* Data consistency checker.
*/
+@Slf4j
public final class DataConsistencyChecker {
// TODO remove jobConfig for common usage
@@ -77,8 +81,10 @@ public final class DataConsistencyChecker {
public DataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
- logicTableNames = jobConfig.splitLogicTableNames();
- tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
+ logicTableNames = Collections.singletonList(jobConfig.getTargetTableName());
+ // TODO need get from actual data source.
+ Map<String, List<String>> schemaTablesMap = SchemaTableUtil.getSchemaTablesMap(jobConfig.getTargetDatabaseName(), Collections.singleton(jobConfig.getTargetTableName()));
+ tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(schemaTablesMap));
this.readRateLimitAlgorithm = readRateLimitAlgorithm;
}
@@ -166,7 +172,7 @@ public final class DataConsistencyChecker {
String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
for (String each : logicTableNames) {
- ShardingSphereTable table = getTableMetaData(jobConfig.getDatabaseName(), each);
+ ShardingSphereTable table = getTableMetaData(jobConfig.getTargetDatabaseName(), each);
if (null == table) {
throw new PipelineDataConsistencyCheckFailedException("Can not get metadata for table " + each);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 8ca2576d731..f1437b7d125 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -188,6 +188,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
if (thisResult instanceof SQLXML && thatResult instanceof SQLXML) {
return ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
}
+ // TODO The standard MySQL JDBC will convert unsigned mediumint to Integer, but proxy convert it to Long
+ if (thisResult instanceof Integer && thatResult instanceof Long) {
+ return ((Integer) thisResult).longValue() == (Long) thatResult;
+ }
if (!new EqualsBuilder().append(thisResult, thatResult).isEquals()) {
log.warn("record column value not match, value1={}, value2={}, record1={}, record2={}", thisResult, thatResult, thisNext, thatNext);
return false;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index 860fad8976f..8ddbbadd35a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -18,9 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,8 +41,6 @@ public final class PipelineJobWorker {
return;
}
log.info("start worker initialization");
- EventBusContext eventBusContext = PipelineContext.getContextManager().getInstanceContext().getEventBusContext();
- eventBusContext.register(RuleAlteredJobWorker.getInstance());
new FinishedCheckJobExecutor().start();
new PipelineJobExecutor().start();
WORKER_INITIALIZED.set(true);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 3b364c51da6..327e16cd324 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -101,7 +101,7 @@ public final class PipelineJobPreparerUtils {
}
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- return PositionInitializerFactory.getInstance(databaseType).init(dataSource);
+ return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
}
/**
@@ -141,10 +141,11 @@ public final class PipelineJobPreparerUtils {
/**
* Cleanup job preparer.
*
+ * @param jobId job id
* @param pipelineDataSourceConfig pipeline data source config
* @throws SQLException sql exception
*/
- public static void destroyPosition(final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
+ public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
PositionInitializer positionInitializer = PositionInitializerFactory.getInstance(databaseType.getType());
log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
@@ -152,7 +153,7 @@ public final class PipelineJobPreparerUtils {
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(dataSourceConfig.getRootConfig()).values()) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
- positionInitializer.destroy(dataSource);
+ positionInitializer.destroy(dataSource, jobId);
}
}
}
@@ -161,7 +162,7 @@ public final class PipelineJobPreparerUtils {
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
DataSourcePoolCreator.create((DataSourceProperties) dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
- positionInitializer.destroy(dataSource);
+ positionInitializer.destroy(dataSource, jobId);
}
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 9d19924716b..e503a4147cd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -103,6 +103,7 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
try (Statement statement = targetConnection.createStatement()) {
statement.execute(sql);
} catch (final SQLException ex) {
+ log.warn("execute target table sql failed", ex);
for (String ignoreMessage : IGNORE_EXCEPTION_MESSAGE) {
if (ex.getMessage().contains(ignoreMessage)) {
return;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
new file mode 100644
index 00000000000..fe42737b2b7
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Schema table util.
+ */
+@Slf4j
+public final class SchemaTableUtil {
+
+ /**
+ * Get schema table map.
+ *
+ * @param databaseName database name
+ * @param logicTables logic tables
+ * @return schema table map
+ */
+ public static Map<String, List<String>> getSchemaTablesMap(final String databaseName, final Set<String> logicTables) {
+ // TODO get by search_path
+ ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
+ Map<String, List<String>> result = new LinkedHashMap<>();
+ database.getSchemas().forEach((schemaName, schema) -> {
+ for (String each : schema.getAllTableNames()) {
+ if (!logicTables.contains(each)) {
+ continue;
+ }
+ result.computeIfAbsent(schemaName, unused -> new LinkedList<>()).add(each);
+ }
+ });
+ log.info("getSchemaTablesMap, result={}", result);
+ return result;
+ }
+
+ /**
+ * Get schema tables map from actual data source.
+ *
+ * @param pipelineDataSourceConfig pipeline data source config
+ * @param tableName table name
+ * @return schema tables map
+ */
+ public static Map<String, List<String>> getSchemaTablesMapFromActual(final PipelineDataSourceConfiguration pipelineDataSourceConfig, final String tableName) {
+ Map<String, List<String>> result = new HashMap<>();
+ try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig)) {
+ try (Connection connection = dataSource.getConnection()) {
+ DatabaseMetaData metaData = connection.getMetaData();
+ ResultSet resultSet = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString("TABLE_SCHEM");
+ result.computeIfAbsent(schemaName, k -> new ArrayList<>()).add(resultSet.getString("TABLE_NAME"));
+ }
+ }
+ } catch (final SQLException ex) {
+ log.error("Get schema name map error", ex);
+ throw new AddMigrationSourceResourceException(ex.getMessage());
+ }
+ return result;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index fb01beb0a6f..a7908548509 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -19,25 +19,36 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
@@ -49,33 +60,40 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
import org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
+import javax.sql.DataSource;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import java.util.stream.Stream;
/**
* Migration job API impl.
@@ -83,6 +101,12 @@ import java.util.stream.Stream;
@Slf4j
public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implements MigrationJobAPI {
+ private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
+
+ private static final YamlDataSourceConfigurationSwapper DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
+
+ private static final YamlPipelineDataSourceConfigurationSwapper PIPELINE_DATA_SOURCE_CONFIG_SWAPPER = new YamlPipelineDataSourceConfigurationSwapper();
+
private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
@@ -95,16 +119,13 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
MigrationJobId jobId = (MigrationJobId) pipelineJobId;
- String text = jobId.getFormatVersion() + "|" + jobId.getCurrentMetadataVersion() + "T" + jobId.getNewMetadataVersion() + "|" + jobId.getDatabaseName();
- return Hex.encodeHexString(text.getBytes(StandardCharsets.UTF_8), true);
+ String text = jobId.getDatabaseName() + "|" + jobId.getTableName() + "|" + jobId.getSourceDataSourceName();
+ return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}
@Override
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig;
- if (null == config.getJobShardingDataNodes()) {
- RuleAlteredJobConfigurationPreparerFactory.getInstance().extendJobConfiguration(config);
- }
if (null == yamlJobConfig.getJobId()) {
config.setJobId(generateJobId(config));
}
@@ -116,15 +137,19 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(), config.getTarget().getParameter());
config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
}
+ JobDataNodeEntry nodeEntry = new JobDataNodeEntry(config.getSourceTableName(),
+ Collections.singletonList(new DataNode(config.getSourceDataSourceName(), config.getSourceTableName())));
+ String dataNodeLine = new JobDataNodeLine(Collections.singletonList(nodeEntry)).marshal();
+ config.setTablesFirstDataNodes(dataNodeLine);
+ config.setJobShardingDataNodes(Collections.singletonList(dataNodeLine));
}
private String generateJobId(final YamlMigrationJobConfiguration config) {
MigrationJobId jobId = new MigrationJobId();
jobId.setTypeCode(getJobType().getTypeCode());
- jobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
- jobId.setCurrentMetadataVersion(config.getActiveVersion());
- jobId.setNewMetadataVersion(config.getNewVersion());
- jobId.setDatabaseName(config.getDatabaseName());
+ jobId.setSourceDataSourceName(config.getSourceDataSourceName());
+ jobId.setTableName(config.getSourceTableName());
+ jobId.setDatabaseName(config.getTargetDatabaseName());
return marshalJobId(jobId);
}
@@ -144,7 +169,44 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
@Override
public TaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
- return RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig, jobShardingItem, pipelineProcessConfig);
+ Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
+ tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), new LogicTableName(jobConfig.getSourceTableName()));
+ Map<LogicTableName, String> tableNameSchemaMap = TableNameSchemaNameMapping.convert(SchemaTableUtil.getSchemaTablesMapFromActual(jobConfig.getSource(), jobConfig.getSourceTableName()));
+ TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(tableNameSchemaMap);
+ DumperConfiguration dumperConfig = createDumperConfiguration(jobConfig.getJobId(), jobConfig.getSourceDataSourceName(), jobConfig.getSource(), tableNameMap, tableNameSchemaNameMapping);
+ // TODO now shardingColumnsMap always empty,
+ ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, pipelineProcessConfig, Collections.emptyMap(), tableNameSchemaNameMapping);
+ TaskConfiguration result = new TaskConfiguration(dumperConfig, importerConfig);
+ log.info("createTaskConfiguration, dataSourceName={}, result={}", jobConfig.getSourceDataSourceName(), result);
+ return result;
+ }
+
+ private static DumperConfiguration createDumperConfiguration(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource,
+ final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ DumperConfiguration result = new DumperConfiguration();
+ result.setJobId(jobId);
+ result.setDataSourceName(dataSourceName);
+ result.setDataSourceConfig(sourceDataSource);
+ result.setTableNameMap(tableNameMap);
+ result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+ return result;
+ }
+
+ private static ImporterConfiguration createImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig,
+ final Map<LogicTableName, Set<String>> shardingColumnsMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ int retryTimes = jobConfig.getRetryTimes();
+ int concurrency = jobConfig.getConcurrency();
+ return new ImporterConfiguration(dataSourceConfig, unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, retryTimes, concurrency);
+ }
+
+ private static Map<LogicTableName, Set<String>> unmodifiable(final Map<LogicTableName, Set<String>> shardingColumnsMap) {
+ Map<LogicTableName, Set<String>> result = new HashMap<>(shardingColumnsMap.size());
+ for (Entry<LogicTableName, Set<String>> entry : shardingColumnsMap.entrySet()) {
+ result.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
+ }
+ return Collections.unmodifiableMap(result);
}
@Override
@@ -154,17 +216,13 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig);
}
- private Stream<JobBriefInfo> getJobBriefInfos() {
- return PipelineAPIFactory.getJobStatisticsAPI().getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_"));
- }
-
protected PipelineJobInfo getJobInfo(final String jobName) {
MigrationJobInfo result = new MigrationJobInfo(jobName);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId());
MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
result.setActive(!jobConfigPOJO.isDisabled());
result.setShardingTotalCount(jobConfig.getJobShardingCount());
- result.setTables(jobConfig.getLogicTables());
+ result.setTables(jobConfig.getSourceTableName());
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
result.setJobParameter(jobConfigPOJO.getJobParameter());
@@ -232,7 +290,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
@Override
public void stopClusterWriteDB(final MigrationJobConfiguration jobConfig) {
- String databaseName = jobConfig.getDatabaseName();
+ String databaseName = jobConfig.getTargetDatabaseName();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
if (lockContext.isLocked(lockDefinition)) {
@@ -258,7 +316,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
@Override
public void restoreClusterWriteDB(final MigrationJobConfiguration jobConfig) {
- String databaseName = jobConfig.getDatabaseName();
+ String databaseName = jobConfig.getTargetDatabaseName();
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(databaseName);
if (lockContext.isLocked(lockDefinition)) {
@@ -370,23 +428,6 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
@Override
public void switchClusterConfiguration(final MigrationJobConfiguration jobConfig) {
- String jobId = jobConfig.getJobId();
- MigrationProcessContext processContext = buildPipelineProcessContext(jobConfig);
- GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- if (isDataConsistencyCheckNeeded(processContext)) {
- Optional<Boolean> checkResult = repositoryAPI.getJobCheckResult(jobId);
- if (!checkResult.isPresent() || !checkResult.get()) {
- throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
- }
- }
- ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), jobConfig.getActiveVersion(), jobConfig.getNewVersion());
- PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
- for (int each : repositoryAPI.getShardingItems(jobId)) {
- PipelineJobCenter.getJobItemContext(jobId, each).ifPresent(jobItemContext -> jobItemContext.setStatus(JobStatus.FINISHED));
- updateJobItemStatus(jobId, each, JobStatus.FINISHED);
- }
- PipelineJobCenter.stop(jobId);
- stop(jobId);
}
@Override
@@ -428,6 +469,53 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
dataSourcePersistService.persist(getJobType(), metaDataDataSource);
}
+ @Override
+ public void createJobAndStart(final CreateMigrationJobParameter parameter) {
+ YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
+ Map<String, DataSourceProperties> metaDataDataSource = dataSourcePersistService.load(JobType.MIGRATION);
+ Map<String, Object> sourceDataSourceProps = DATA_SOURCE_CONFIG_SWAPPER.swapToMap(metaDataDataSource.get(parameter.getSourceResourceName()));
+ YamlPipelineDataSourceConfiguration sourcePipelineDataSourceConfiguration = createYamlPipelineDataSourceConfiguration(StandardPipelineDataSourceConfiguration.TYPE,
+ YamlEngine.marshal(sourceDataSourceProps));
+ result.setSource(sourcePipelineDataSourceConfiguration);
+ result.setSourceDatabaseType(new StandardPipelineDataSourceConfiguration(sourceDataSourceProps).getDatabaseType().getType());
+ result.setSourceDataSourceName(parameter.getSourceResourceName());
+ result.setSourceTableName(parameter.getSourceTableName());
+ Map<String, Map<String, Object>> targetDataSourceProperties = new HashMap<>();
+ ShardingSphereDatabase targetDatabase = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(parameter.getTargetDatabaseName());
+ for (Entry<String, DataSource> entry : targetDatabase.getResource().getDataSources().entrySet()) {
+ Map<String, Object> dataSourceProps = DATA_SOURCE_CONFIG_SWAPPER.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()));
+ targetDataSourceProperties.put(entry.getKey(), dataSourceProps);
+ }
+ String targetDatabaseName = parameter.getTargetDatabaseName();
+ YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(targetDatabaseName, targetDataSourceProperties, targetDatabase.getRuleMetaData().getConfigurations());
+ PipelineDataSourceConfiguration targetPipelineDataSource = new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+ result.setTarget(createYamlPipelineDataSourceConfiguration(targetPipelineDataSource.getType(), YamlEngine.marshal(targetPipelineDataSource.getDataSourceConfiguration())));
+ result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
+ result.setTargetDatabaseName(targetDatabaseName);
+ result.setTargetTableName(parameter.getTargetTableName());
+ result.setSchemaTablesMap(SchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
+ parameter.getSourceTableName()));
+ extendYamlJobConfiguration(result);
+ MigrationJobConfiguration jobConfiguration = new YamlMigrationJobConfigurationSwapper().swapToObject(result);
+ start(jobConfiguration);
+ }
+
+ private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) {
+ YamlRootConfiguration result = new YamlRootConfiguration();
+ result.setDatabaseName(databaseName);
+ result.setDataSources(yamlDataSources);
+ Collection<YamlRuleConfiguration> yamlRuleConfigurations = RULE_CONFIG_SWAPPER_ENGINE.swapToYamlRuleConfigurations(rules);
+ result.setRules(yamlRuleConfigurations);
+ return result;
+ }
+
+ private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final String type, final String parameter) {
+ YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
+ result.setType(type);
+ result.setParameter(parameter);
+ return result;
+ }
+
@Override
public String getType() {
return getJobType().getTypeName();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
index f204d9d774a..94d2bc1825b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobId.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.Getter;
-import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJobId;
@@ -33,9 +32,7 @@ public final class MigrationJobId extends AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
- @NonNull
- private Integer currentMetadataVersion;
+ private String tableName;
- @NonNull
- private Integer newMetadataVersion;
+ private String sourceDataSourceName;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index eae08b224aa..0c47a28a51a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -42,8 +41,6 @@ import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.datanode.DataNodes;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -52,11 +49,11 @@ import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
/**
* Migration job preparer.
@@ -140,22 +137,22 @@ public final class MigrationJobPreparer {
TableNameSchemaNameMapping tableNameSchemaNameMapping = jobItemContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
String targetDatabaseType = jobConfig.getTargetDatabaseType();
if (isSourceAndTargetSchemaAvailable(jobConfig)) {
- PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
- DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getDatabaseName(),
+ PrepareTargetSchemasParameter prepareTargetSchemasParameter = new PrepareTargetSchemasParameter(Collections.singletonList(jobConfig.getTargetTableName()),
+ DatabaseTypeFactory.getInstance(targetDatabaseType), jobConfig.getTargetDatabaseName(),
jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), jobItemContext.getDataSourceManager(), tableNameSchemaNameMapping);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, prepareTargetSchemasParameter);
}
ShardingSphereMetaData metaData = PipelineContext.getContextManager().getMetaDataContexts().getMetaData();
- ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getDatabaseName());
+ ShardingSphereDatabase sphereDatabase = metaData.getDatabases().get(jobConfig.getTargetDatabaseName());
ShardingSphereSQLParserEngine sqlParserEngine = metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(sphereDatabase.getProtocolType().getType());
JobDataNodeLine jobDataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
Map<String, String> tableNameMap = new HashMap<>();
- for (JobDataNodeEntry each : jobDataNodeLine.getEntries()) {
- String actualTableName = getActualTable(sphereDatabase, each.getLogicTableName());
- tableNameMap.put(each.getLogicTableName(), actualTableName);
- }
- PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getDatabaseName(),
- jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sphereDatabase.getResource().getDataSources(), jobItemContext.getDataSourceManager(),
+ tableNameMap.put(jobConfig.getTargetTableName(), jobConfig.getSourceTableName());
+ PipelineDataSourceWrapper dataSource = jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
+ Map<String, DataSource> sourceDataSourceMap = new HashMap<>(1, 1.0F);
+ sourceDataSourceMap.put(jobConfig.getSourceDataSourceName(), dataSource);
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(jobConfig.getTargetDatabaseName(),
+ jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), sourceDataSourceMap, jobItemContext.getDataSourceManager(),
jobDataNodeLine, tableNameMap, tableNameSchemaNameMapping, sqlParserEngine);
PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, prepareTargetTablesParameter);
}
@@ -170,14 +167,6 @@ public final class MigrationJobPreparer {
return true;
}
- private String getActualTable(final ShardingSphereDatabase database, final String tableName) {
- DataNodes dataNodes = new DataNodes(database.getRuleMetaData().getRules());
- Optional<DataNode> filteredDataNode = dataNodes.getDataNodes(tableName).stream()
- .filter(each -> database.getResource().getDataSources().containsKey(each.getDataSourceName().contains(".") ? each.getDataSourceName().split("\\.")[0] : each.getDataSourceName()))
- .findFirst();
- return filteredDataNode.map(DataNode::getTableName).orElse(tableName);
- }
-
private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceMetaDataLoader(), jobItemContext.getDataSourceManager(),
jobItemContext.getJobProcessContext().getImporterExecuteEngine(), jobItemContext.getSourceDataSource(), jobItemContext.getTaskConfig(), jobItemContext.getInitProgress());
@@ -205,7 +194,7 @@ public final class MigrationJobPreparer {
*/
public void cleanup(final MigrationJobConfiguration jobConfig) {
try {
- PipelineJobPreparerUtils.destroyPosition(jobConfig.getSource());
+ PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), jobConfig.getSource());
} catch (final SQLException ex) {
log.warn("Scaling job destroying failed", ex);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
deleted file mode 100644
index 7d81e4c0960..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-
-import com.google.common.base.Preconditions;
-import com.google.common.eventbus.Subscribe;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
-import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Rule altered job worker.
- */
-@SuppressWarnings("UnstableApiUsage")
-@Slf4j
-public final class RuleAlteredJobWorker {
-
- private static final RuleAlteredJobWorker INSTANCE = new RuleAlteredJobWorker();
-
- private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
-
- /**
- * Get instance.
- *
- * @return instance
- */
- public static RuleAlteredJobWorker getInstance() {
- return INSTANCE;
- }
-
- /**
- * Is on rule altered action enabled.
- *
- * @param ruleConfig rule configuration
- * @return enabled or not
- */
- public static boolean isOnRuleAlteredActionEnabled(final RuleConfiguration ruleConfig) {
- if (null == ruleConfig) {
- return false;
- }
- Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(ruleConfig);
- return detector.isPresent() && detector.get().getOnRuleAlteredActionConfig(ruleConfig).isPresent();
- }
-
- /**
- * Create rule altered context.
- *
- * @param jobConfig job configuration
- * @return rule altered context
- */
- public static MigrationProcessContext createRuleAlteredContext(final MigrationJobConfiguration jobConfig) {
- YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
- YamlRuleConfiguration yamlRuleConfig = null;
- for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
- if (jobConfig.getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
- yamlRuleConfig = each;
- break;
- }
- }
- if (null == yamlRuleConfig) {
- throw new PipelineJobCreationException("could not find altered rule");
- }
- RuleConfiguration ruleConfig = SWAPPER_ENGINE.swapToRuleConfiguration(yamlRuleConfig);
- Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(ruleConfig);
- Preconditions.checkState(detector.isPresent());
- Optional<OnRuleAlteredActionConfiguration> onRuleAlteredActionConfig = detector.get().getOnRuleAlteredActionConfig(ruleConfig);
- if (!onRuleAlteredActionConfig.isPresent()) {
- log.error("rule altered action enabled but actor is not configured, ignored, ruleConfig={}", ruleConfig);
- throw new PipelineJobCreationException("rule altered actor not configured");
- }
- return new MigrationProcessContext(jobConfig.getJobId(), onRuleAlteredActionConfig.get());
- }
-
- /**
- * Get YAML root configuration, which should include rule altered action configuration.
- *
- * @param jobConfig job configuration
- * @return YAML root configuration
- */
- private static YamlRootConfiguration getYamlRootConfig(final MigrationJobConfiguration jobConfig) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
- return ((ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig).getRootConfig();
- }
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
- return ((ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig).getRootConfig();
- }
-
- /**
- * Start scaling job.
- *
- * @param event start scaling event.
- */
- @Subscribe
- public void start(final StartScalingEvent event) {
- log.info("Start scaling job by {}", event);
- if (hasUncompletedJobOfSameDatabaseName(event.getDatabaseName())) {
- log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
- return;
- }
- Optional<MigrationJobConfiguration> jobConfig = createJobConfig(event);
- if (jobConfig.isPresent()) {
- MigrationJobAPIFactory.getInstance().start(jobConfig.get());
- } else {
- log.info("Switch rule configuration immediately.");
- ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
- PipelineContext.getContextManager().getInstanceContext().getEventBusContext().post(taskFinishedEvent);
- }
- }
-
- private Optional<MigrationJobConfiguration> createJobConfig(final StartScalingEvent event) {
- YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
- YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
- Map<String, List<String>> alteredRuleYamlClassNameTablesMap = new HashMap<>();
- for (Pair<YamlRuleConfiguration, YamlRuleConfiguration> each : groupSourceTargetRuleConfigsByType(sourceRootConfig.getRules(), targetRootConfig.getRules())) {
- YamlRuleConfiguration yamlRuleConfig = null == each.getLeft() ? each.getRight() : each.getLeft();
- Optional<RuleAlteredDetector> detector = RuleAlteredDetectorFactory.findInstance(yamlRuleConfig);
- if (!detector.isPresent()) {
- continue;
- }
- List<String> ruleAlteredLogicTables = detector.get().findRuleAlteredLogicTables(each.getLeft(), each.getRight(), sourceRootConfig.getDataSources(), targetRootConfig.getDataSources());
- log.info("type={}, ruleAlteredLogicTables={}", yamlRuleConfig.getClass().getName(), ruleAlteredLogicTables);
- if (!ruleAlteredLogicTables.isEmpty()) {
- alteredRuleYamlClassNameTablesMap.put(yamlRuleConfig.getClass().getName(), ruleAlteredLogicTables);
- }
- }
- if (alteredRuleYamlClassNameTablesMap.isEmpty()) {
- log.error("no altered rule");
- throw new PipelineJobCreationException("no altered rule");
- }
- if (alteredRuleYamlClassNameTablesMap.size() > 1) {
- log.error("more than 1 rule altered");
- throw new PipelineJobCreationException("more than 1 rule altered");
- }
- YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
- result.setDatabaseName(event.getDatabaseName());
- result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
- result.setActiveVersion(event.getActiveVersion());
- result.setNewVersion(event.getNewVersion());
- result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
- result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
- PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
- return Optional.of(new YamlMigrationJobConfigurationSwapper().swapToObject(result));
- }
-
- private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
- final Collection<YamlRuleConfiguration> targetRules) {
- Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> sourceRulesMap = sourceRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass, Function.identity()));
- Map<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> targetRulesMap = targetRules.stream().collect(Collectors.toMap(YamlRuleConfiguration::getClass, Function.identity()));
- Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> result = new LinkedList<>();
- for (Entry<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> entry : sourceRulesMap.entrySet()) {
- YamlRuleConfiguration targetRule = targetRulesMap.get(entry.getKey());
- result.add(Pair.of(entry.getValue(), targetRule));
- }
- for (Entry<Class<? extends YamlRuleConfiguration>, YamlRuleConfiguration> entry : targetRulesMap.entrySet()) {
- if (!sourceRulesMap.containsKey(entry.getKey())) {
- result.add(Pair.of(null, entry.getValue()));
- }
- }
- return result;
- }
-
- private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final YamlRootConfiguration yamlConfig) {
- PipelineDataSourceConfiguration config = new ShardingSpherePipelineDataSourceConfiguration(yamlConfig);
- YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
- result.setType(config.getType());
- result.setParameter(config.getParameter());
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private YamlRootConfiguration getYamlRootConfiguration(final String databaseName, final String dataSources, final String rules) {
- YamlRootConfiguration result = new YamlRootConfiguration();
- result.setDatabaseName(databaseName);
- Map<String, Map<String, Object>> yamlDataSources = YamlEngine.unmarshal(dataSources, Map.class);
- result.setDataSources(yamlDataSources);
- Collection<YamlRuleConfiguration> yamlRuleConfigs = YamlEngine.unmarshal(rules, Collection.class, true);
- result.setRules(yamlRuleConfigs);
- return result;
- }
-
- private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
- boolean result = false;
- for (PipelineJobInfo each : MigrationJobAPIFactory.getInstance().list()) {
- if (MigrationJobAPIFactory.getInstance().getJobProgress(each.getJobId()).values().stream()
- .allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
- continue;
- }
- MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(each.getJobParameter());
- if (databaseName.equals(jobConfig.getDatabaseName())) {
- result = true;
- break;
- }
- }
- return result;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 424ee393d5b..a95657857cf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContex
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
@@ -166,6 +167,10 @@ public final class MigrationJobAPIFixture implements MigrationJobAPI {
public void dropMigrationSourceResources(final Collection<String> resourceNames) {
}
+ @Override
+ public void createJobAndStart(final CreateMigrationJobParameter parameter) {
+ }
+
@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return null;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
index 0d57ff5638c..0f423a77a86 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
public final class FixturePositionInitializer implements PositionInitializer {
@Override
- public IngestPosition<?> init(final DataSource dataSource) throws SQLException {
+ public IngestPosition<?> init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
return null;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 2b9ceeb066e..7263a01bab3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -30,10 +30,9 @@ public final class PipelineJobIdUtilsTest {
public void assertCodec() {
MigrationJobId pipelineJobId = new MigrationJobId();
pipelineJobId.setTypeCode(JobType.MIGRATION.getTypeCode());
- pipelineJobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
pipelineJobId.setDatabaseName("sharding_db");
- pipelineJobId.setCurrentMetadataVersion(0);
- pipelineJobId.setNewMetadataVersion(1);
+ pipelineJobId.setSourceDataSourceName("new_0");
+ pipelineJobId.setTableName("t_order");
String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, is(JobType.MIGRATION));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
index 637afcd625e..8411be2acdb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializer.java
@@ -33,7 +33,7 @@ import java.sql.SQLException;
public final class MySQLPositionInitializer implements PositionInitializer {
@Override
- public BinlogPosition init(final DataSource dataSource) throws SQLException {
+ public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
BinlogPosition result = getBinlogPosition(connection);
result.setServerId(getServerId(connection));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
index a5501bbfd46..40f8df40373 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLPositionInitializerTest.java
@@ -62,7 +62,7 @@ public final class MySQLPositionInitializerTest {
@Test
public void assertGetCurrentPosition() throws SQLException {
MySQLPositionInitializer mySQLPositionInitializer = new MySQLPositionInitializer();
- BinlogPosition actual = mySQLPositionInitializer.init(dataSource);
+ BinlogPosition actual = mySQLPositionInitializer.init(dataSource, "");
assertThat(actual.getServerId(), is(SERVER_ID));
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 98cc575d786..5686c12076b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -35,7 +34,6 @@ import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
-import java.util.Collections;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
@@ -86,7 +84,6 @@ public final class MySQLDataSourcePreparerTest {
when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
when(jobConfig.getTarget().getParameter()).thenReturn("target");
when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
- when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
}
@Test
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index c05d33c4e3c..49ec98c05d4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -45,9 +45,9 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@Override
- public WalPosition init(final DataSource dataSource) throws SQLException {
+ public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection);
+ createSlotIfNotExist(connection, slotNameSuffix);
return getWalPosition(connection);
}
}
@@ -63,10 +63,10 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
* @param connection connection
* @throws SQLException SQL exception
*/
- private void createSlotIfNotExist(final Connection connection) throws SQLException {
- String slotName = getUniqueSlotName(connection);
+ private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+ String slotName = getUniqueSlotName(connection, slotNameSuffix);
if (!isSlotExist(connection, slotName)) {
- createSlotBySQL(connection);
+ createSlotBySQL(connection, slotName);
}
}
@@ -81,8 +81,8 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
}
}
- private void createSlotBySQL(final Connection connection) throws SQLException {
- String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", getUniqueSlotName(connection), DECODE_PLUGIN);
+ private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException {
+ String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (final SQLException ex) {
@@ -102,14 +102,14 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
}
@Override
- public void destroy(final DataSource dataSource) throws SQLException {
+ public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection);
+ dropSlotIfExist(connection, slotNameSuffix);
}
}
- private void dropSlotIfExist(final Connection connection) throws SQLException {
- String slotName = getUniqueSlotName(connection);
+ private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+ String slotName = getUniqueSlotName(connection, slotNameSuffix);
if (!isSlotExist(connection, slotName)) {
log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
return;
@@ -124,12 +124,13 @@ public final class OpenGaussPositionInitializer implements PositionInitializer {
* Get the unique slot name by connection.
*
* @param connection connection
+ * @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
- public static String getUniqueSlotName(final Connection connection) throws SQLException {
+ public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// same as PostgreSQL, but length over 64 will throw an exception directly
- String slotName = DigestUtils.md5Hex(connection.getCatalog());
+ String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 2769ad9303a..8362bed849d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -77,7 +77,7 @@ public final class OpenGaussWalDumper extends AbstractIncrementalDumper<WalPosit
private void dump() {
PGReplicationStream stream = null;
try (PgConnection connection = getReplicationConnectionUnwrap()) {
- stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection));
+ stream = logicalReplication.createReplicationStream(connection, walPosition.getLogSequenceNumber(), OpenGaussPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()));
DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()));
while (isRunning()) {
ByteBuffer message = stream.readPending();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 178f4e5c3c4..02855ad13bf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -43,9 +43,9 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
@Override
- public WalPosition init(final DataSource dataSource) throws SQLException {
+ public WalPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection, getUniqueSlotName(connection));
+ createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
return getWalPosition(connection);
}
}
@@ -101,14 +101,14 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
}
@Override
- public void destroy(final DataSource dataSource) throws SQLException {
+ public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection);
+ dropSlotIfExist(connection, slotNameSuffix);
}
}
- private void dropSlotIfExist(final Connection connection) throws SQLException {
- String slotName = getUniqueSlotName(connection);
+ private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
+ String slotName = getUniqueSlotName(connection, slotNameSuffix);
if (!isSlotExisting(connection, slotName)) {
log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
return;
@@ -125,12 +125,13 @@ public final class PostgreSQLPositionInitializer implements PositionInitializer
* Get the unique slot name by connection.
*
* @param connection the connection
+ * @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
- public static String getUniqueSlotName(final Connection connection) throws SQLException {
+ public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
- String slotName = DigestUtils.md5Hex(connection.getCatalog());
+ String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index c844cb063d6..67f181511ab 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -79,7 +79,8 @@ public final class PostgreSQLWalDumper extends AbstractIncrementalDumper<WalPosi
// TODO use unified PgConnection
try (
Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperConfig.getDataSourceConfig());
- PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection), walPosition.getLogSequenceNumber())) {
+ PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperConfig.getJobId()),
+ walPosition.getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
while (isRunning()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
index 44f411357ee..99d6325270c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializerTest.java
@@ -61,8 +61,8 @@ public final class PostgreSQLPositionInitializerTest {
when(connection.getCatalog()).thenReturn("sharding_db");
when(connection.getMetaData()).thenReturn(databaseMetaData);
PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN();
- when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLPositionInitializer.getUniqueSlotName(connection), "test_decoding")))
- .thenReturn(mock(PreparedStatement.class));
+ when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLPositionInitializer.getUniqueSlotName(connection, ""),
+ "test_decoding"))).thenReturn(mock(PreparedStatement.class));
when(connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement);
PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN();
when(connection.prepareStatement("SELECT PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);
@@ -73,7 +73,7 @@ public final class PostgreSQLPositionInitializerTest {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
+ WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}
@@ -81,7 +81,7 @@ public final class PostgreSQLPositionInitializerTest {
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
- WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
+ WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}
@@ -90,7 +90,7 @@ public final class PostgreSQLPositionInitializerTest {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
- new PostgreSQLPositionInitializer().init(dataSource);
+ new PostgreSQLPositionInitializer().init(dataSource, "");
}
@SneakyThrows(SQLException.class)
@@ -127,7 +127,7 @@ public final class PostgreSQLPositionInitializerTest {
mockSlotExistsOrNot(true);
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(connection.prepareStatement("SELECT pg_drop_replication_slot(?)")).thenReturn(preparedStatement);
- new PostgreSQLPositionInitializer().destroy(dataSource);
+ new PostgreSQLPositionInitializer().destroy(dataSource, "");
verify(preparedStatement).execute();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index fef17423483..7425970029a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -37,6 +37,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
@@ -51,6 +52,9 @@ import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -103,6 +107,7 @@ public final class PostgreSQLWalDumperTest {
}
PipelineDataSourceConfiguration dataSourceConfig = new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password);
DumperConfiguration result = new DumperConfiguration();
+ result.setJobId("0101123455F45SCALING8898");
result.setDataSourceConfig(dataSourceConfig);
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
@@ -116,13 +121,16 @@ public final class PostgreSQLWalDumperTest {
ReflectionUtil.setFieldValue(walDumper, "logicalReplication", logicalReplication);
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
- when(pgConnection.getCatalog()).thenReturn("test_db");
- when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection), position.getLogSequenceNumber()))
- .thenReturn(pgReplicationStream);
- ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
- when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
- when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
- walDumper.start();
+ try (MockedStatic<PostgreSQLPositionInitializer> positionInitializer = mockStatic(PostgreSQLPositionInitializer.class)) {
+ positionInitializer.when(() -> PostgreSQLPositionInitializer.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123455F45SCALING8898");
+ when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber()))
+ .thenReturn(pgReplicationStream);
+ ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
+ when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
+ when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
+ // TODO NPE occurred here
+ walDumper.start();
+ }
} catch (final IngestException ignored) {
}
assertThat(channel.fetchRecords(100, 0).size(), is(1));
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
index 7e17bb3476a..da85cb3f2fb 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rdl/rule/RuleDefinitionBackendHandler.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.rule;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.distsql.parser.statement.rdl.RuleDefinitionStatement;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.distsql.exception.DistSQLException;
@@ -76,12 +75,7 @@ public final class RuleDefinitionBackendHandler<T extends RuleDefinitionStatemen
RuleConfiguration currentRuleConfig = findCurrentRuleConfiguration(database, ruleConfigClass).orElse(null);
ruleDefinitionUpdater.checkSQLStatement(database, sqlStatement, currentRuleConfig);
Optional<RuleDefinitionAlterPreprocessor> preprocessor = RuleDefinitionAlterPreprocessorFactory.findInstance(sqlStatement);
- if (!RuleAlteredJobWorker.isOnRuleAlteredActionEnabled(currentRuleConfig)) {
- if (RULE_ALTERED_ACTIONS.contains(sqlStatement.getClass().getCanonicalName())) {
- // TODO throw new RuntimeException("scaling is not enabled");
- log.warn("rule altered and scaling is not enabled.");
- }
- } else if (preprocessor.isPresent()) {
+ if (preprocessor.isPresent()) {
prepareScaling(database, sqlStatement, (RuleDefinitionAlterUpdater) ruleDefinitionUpdater, currentRuleConfig, preprocessor.get());
return new UpdateResponseHeader(sqlStatement);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
index a84fa4e4360..9ac98ddad79 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
@@ -26,8 +26,6 @@ import org.apache.shardingsphere.test.integration.env.container.atomic.util.Data
import javax.xml.bind.JAXB;
import java.util.Objects;
-import static org.junit.Assert.assertTrue;
-
@Slf4j
public abstract class BaseExtraSQLITCase extends BaseITCase {
@@ -39,42 +37,23 @@ public abstract class BaseExtraSQLITCase extends BaseITCase {
extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseExtraSQLITCase.class.getClassLoader().getResource(parameterized.getScenario())), ExtraSQLCommand.class);
}
- protected void createNoUseTable() {
- executeWithLog("CREATE SHARDING TABLE RULE no_use (RESOURCES(ds_0, ds_1), SHARDING_COLUMN=sharding_id, TYPE(NAME='MOD',PROPERTIES('sharding-count'='4')))");
- executeWithLog("CREATE TABLE no_use(id int(11) NOT NULL,sharding_id int(11) NOT NULL, PRIMARY KEY (id))");
- }
-
- protected void createOrderTable() {
- executeWithLog(extraSQLCommand.getCreateTableOrder());
+ protected void createSourceOrderTable() {
+ sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder());
}
- protected void createTableIndexList(final String schema) {
+ protected void createSourceTableIndexList(final String schema) {
if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
- executeWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.t_order ( user_id )", schema));
+ sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS idx_user_id ON %s.t_order ( user_id )", schema));
} else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
- executeWithLog(String.format("CREATE INDEX idx_user_id ON %s.t_order ( user_id )", schema));
+ sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON %s.t_order ( user_id )", schema));
}
}
- protected void createOrderItemTable() {
- executeWithLog(extraSQLCommand.getCreateTableOrderItem());
+ protected void createSourceCommentOnList(final String schema) {
+ sourceExecuteWithLog(String.format("COMMENT ON COLUMN %s.t_order.user_id IS 'user id'", schema));
}
- private boolean executeSql(final String sql) {
- try {
- getJdbcTemplate().execute(sql);
- return true;
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- // TODO openGauss seem return the different error message, need to check it
- if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
- log.info("openGauss error msg:{}", ex.getMessage());
- return false;
- } else {
- assertTrue(ex.getMessage(), ex.getCause().getMessage().endsWith("The database sharding_db is read-only"));
- }
- return false;
- }
+ protected void createSourceOrderItemTable() {
+ sourceExecuteWithLog(extraSQLCommand.getCreateTableOrderItem());
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 8ccd188320e..b0dcc2f8433 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -29,12 +29,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.integration.data.pipeline.cases.command.CommonSQLCommand;
+import org.apache.shardingsphere.integration.data.pipeline.cases.command.MigrationDistSQLCommand;
import org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEnvTypeEnum;
import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
+import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import org.apache.shardingsphere.integration.data.pipeline.framework.watcher.ScalingWatcher;
@@ -44,17 +43,18 @@ import org.apache.shardingsphere.test.integration.env.container.atomic.util.Data
import org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironment;
import org.junit.Rule;
import org.opengauss.util.PSQLException;
-import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.BadSqlGrammarException;
-import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import javax.xml.bind.JAXB;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -80,8 +80,6 @@ public abstract class BaseITCase {
protected static final String DS_0 = "scaling_it_0";
- protected static final String DS_1 = "scaling_it_1";
-
protected static final String DS_2 = "scaling_it_2";
protected static final String DS_3 = "scaling_it_3";
@@ -98,7 +96,7 @@ public abstract class BaseITCase {
private final BaseComposedContainer composedContainer;
- private final CommonSQLCommand commonSQLCommand;
+ private final MigrationDistSQLCommand migrationDistSQLCommand;
private final DatabaseType databaseType;
@@ -106,7 +104,9 @@ public abstract class BaseITCase {
private final String password;
- private JdbcTemplate jdbcTemplate;
+ private DataSource sourceDataSource;
+
+ private DataSource proxyDataSource;
@Setter
private Thread increaseTaskThread;
@@ -114,13 +114,13 @@ public abstract class BaseITCase {
public BaseITCase(final ScalingParameterized parameterized) {
databaseType = parameterized.getDatabaseType();
if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
- composedContainer = new DockerComposedContainer(parameterized.getDatabaseType(), parameterized.getDockerImageName());
+ composedContainer = new MigrationComposedContainer(parameterized.getDatabaseType(), parameterized.getDockerImageName());
} else {
composedContainer = new NativeComposedContainer(parameterized.getDatabaseType());
}
composedContainer.start();
if (ENV.getItEnvType() == ScalingITEnvTypeEnum.DOCKER) {
- DockerStorageContainer storageContainer = ((DockerComposedContainer) composedContainer).getStorageContainer();
+ DockerStorageContainer storageContainer = ((MigrationComposedContainer) composedContainer).getStorageContainer();
username = storageContainer.getUsername();
password = storageContainer.getUnifiedPassword();
} else {
@@ -131,12 +131,12 @@ public abstract class BaseITCase {
if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
cleanUpDataSource();
}
- commonSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")), CommonSQLCommand.class);
- scalingWatcher = new ScalingWatcher(composedContainer, jdbcTemplate);
+ migrationDistSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseITCase.class.getClassLoader().getResource("env/common/command.xml")), MigrationDistSQLCommand.class);
+ scalingWatcher = new ScalingWatcher(composedContainer);
}
private void cleanUpDataSource() {
- for (String each : Arrays.asList(DS_0, DS_1, DS_2, DS_3, DS_4)) {
+ for (String each : Arrays.asList(DS_0, DS_2, DS_3, DS_4)) {
composedContainer.cleanUpDatabase(each);
}
}
@@ -150,131 +150,104 @@ public abstract class BaseITCase {
try (Connection connection = DriverManager.getConnection(jdbcUrl, ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
try {
- executeWithLog(connection, "DROP DATABASE sharding_db");
+ connectionExecuteWithLog(connection, "DROP DATABASE sharding_db");
} catch (final SQLException ex) {
log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
}
}
- executeWithLog(connection, "CREATE DATABASE sharding_db");
+ connectionExecuteWithLog(connection, "CREATE DATABASE sharding_db");
} catch (final SQLException ex) {
throw new IllegalStateException(ex);
}
- jdbcTemplate = new JdbcTemplate(getProxyDataSource("sharding_db"));
+ sourceDataSource = getDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
+ proxyDataSource = getDataSource(composedContainer.getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
}
- protected DataSource getProxyDataSource(final String databaseName) {
+ private DataSource getDataSource(final String jdbcUrl, final String username, final String password) {
HikariDataSource result = new HikariDataSource();
result.setDriverClassName(DataSourceEnvironment.getDriverClassName(getDatabaseType()));
- result.setJdbcUrl(composedContainer.getProxyJdbcUrl(databaseName));
- result.setUsername(ProxyContainerConstants.USERNAME);
- result.setPassword(ProxyContainerConstants.PASSWORD);
+ result.setJdbcUrl(jdbcUrl);
+ result.setUsername(username);
+ result.setPassword(password);
result.setMaximumPoolSize(2);
result.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
return result;
}
- protected boolean waitShardingAlgorithmEffect(final int maxWaitTimes) {
- long startTime = System.currentTimeMillis();
- int waitTimes = 0;
- do {
- List<Map<String, Object>> result = queryForListWithLog("SHOW SHARDING ALGORITHMS");
- if (result.size() >= 3) {
- log.info("waitShardingAlgorithmEffect time consume: {}", System.currentTimeMillis() - startTime);
- return true;
- }
- ThreadUtil.sleep(2, TimeUnit.SECONDS);
- waitTimes++;
- } while (waitTimes <= maxWaitTimes);
- return false;
- }
-
- @SneakyThrows
+ @SneakyThrows(SQLException.class)
protected void addSourceResource() {
- // TODO if mysql can append database firstly, they can be combined
- if (databaseType instanceof MySQLDatabaseType) {
- try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl(""), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- connection.createStatement().execute("USE sharding_db");
- addSourceResource0(connection);
- }
- } else {
- try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
- addSourceResource0(connection);
- }
+ try (Connection connection = DriverManager.getConnection(getComposedContainer().getProxyJdbcUrl("sharding_db"), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
+ addSourceResource0(connection);
}
- List<Map<String, Object>> resources = queryForListWithLog("SHOW DATABASE RESOURCES FROM sharding_db");
- assertThat(resources.size(), is(2));
}
- private void addSourceResource0(final Connection connection) throws SQLException {
- String addSourceResource = commonSQLCommand.getSourceAddResourceTemplate().replace("${user}", username)
+ @SneakyThrows(SQLException.class)
+ private void addSourceResource0(final Connection connection) {
+ if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
+ try {
+ connectionExecuteWithLog(connection, "DROP MIGRATION SOURCE RESOURCE ds_0");
+ } catch (final SQLException ex) {
+ log.warn("Drop sharding_db failed, maybe it's not exist. error msg={}", ex.getMessage());
+ }
+ }
+ String addSourceResource = migrationDistSQLCommand.getAddMigrationSourceResourceTemplate().replace("${user}", username)
.replace("${password}", password)
- .replace("${ds0}", getActualJdbcUrlTemplate(DS_0))
- .replace("${ds1}", getActualJdbcUrlTemplate(DS_1));
- executeWithLog(connection, addSourceResource);
+ .replace("${ds0}", getActualJdbcUrlTemplate(DS_0, true));
+ connectionExecuteWithLog(connection, addSourceResource);
}
@SneakyThrows
protected void addTargetResource() {
- String addTargetResource = commonSQLCommand.getTargetAddResourceTemplate().replace("${user}", username)
+ String addTargetResource = migrationDistSQLCommand.getAddMigrationTargetResourceTemplate().replace("${user}", username)
.replace("${password}", password)
- .replace("${ds2}", getActualJdbcUrlTemplate(DS_2))
- .replace("${ds3}", getActualJdbcUrlTemplate(DS_3))
- .replace("${ds4}", getActualJdbcUrlTemplate(DS_4));
- executeWithLog(addTargetResource);
+ .replace("${ds2}", getActualJdbcUrlTemplate(DS_2, true))
+ .replace("${ds3}", getActualJdbcUrlTemplate(DS_3, true))
+ .replace("${ds4}", getActualJdbcUrlTemplate(DS_4, true));
+ proxyExecuteWithLog(addTargetResource, 2);
List<Map<String, Object>> resources = queryForListWithLog("SHOW DATABASE RESOURCES from sharding_db");
- assertThat(resources.size(), is(5));
- assertBeforeApplyScalingMetadataCorrectly();
+ assertThat(resources.size(), is(3));
}
- private String getActualJdbcUrlTemplate(final String databaseName) {
+ private String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
if (ScalingITEnvTypeEnum.DOCKER == ENV.getItEnvType()) {
- DockerStorageContainer storageContainer = ((DockerComposedContainer) composedContainer).getStorageContainer();
- return DataSourceEnvironment.getURL(getDatabaseType(), getDatabaseType().getType().toLowerCase() + ".host", storageContainer.getPort(), databaseName);
+ DockerStorageContainer storageContainer = ((MigrationComposedContainer) composedContainer).getStorageContainer();
+ if (isInContainer) {
+ return DataSourceEnvironment.getURL(getDatabaseType(), getDatabaseType().getType().toLowerCase() + ".host", storageContainer.getPort(), databaseName);
+ } else {
+ return DataSourceEnvironment.getURL(getDatabaseType(), storageContainer.getHost(), storageContainer.getFirstMappedPort(), databaseName);
+ }
}
return DataSourceEnvironment.getURL(getDatabaseType(), "127.0.0.1", ENV.getActualDataSourceDefaultPort(databaseType), databaseName);
}
- protected void initShardingAlgorithm() {
- executeWithLog(getCommonSQLCommand().getCreateDatabaseShardingAlgorithm());
- executeWithLog(getCommonSQLCommand().getCreateOrderShardingAlgorithm());
- executeWithLog(getCommonSQLCommand().getCreateOrderItemShardingAlgorithm());
+ protected void createTargetOrderTableRule() {
+ proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderTableRule(), 3);
}
- protected void createOrderTableRule() {
- executeWithLog(commonSQLCommand.getCreateOrderTableRule());
+ protected void createTargetOrderItemTableRule() {
+ proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(), 3);
}
- protected void createOrderItemTableRule() {
- executeWithLog(commonSQLCommand.getCreateOrderItemTableRule());
+ protected void startMigrationOrder() {
+ proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 5);
}
- protected void bindingShardingRule() {
- executeWithLog("CREATE SHARDING BINDING TABLE RULES (t_order,t_order_item)");
+ protected void startMigrationOrderItem() {
+ proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 5);
}
+ // TODO use new DistSQL
protected void createScalingRule() {
- if (ENV.getItEnvType() == ScalingITEnvTypeEnum.NATIVE) {
- try {
- List<Map<String, Object>> scalingList = jdbcTemplate.queryForList("SHOW MIGRATION LIST");
- for (Map<String, Object> each : scalingList) {
- String id = each.get("id").toString();
- executeWithLog(String.format("CLEAN MIGRATION '%s'", id), 0);
- }
- } catch (final DataAccessException ex) {
- log.error("Failed to show migration list. {}", ex.getMessage());
- }
- }
- executeWithLog("CREATE SHARDING SCALING RULE scaling_manual (INPUT(SHARDING_SIZE=1000), DATA_CONSISTENCY_CHECKER(TYPE(NAME='DATA_MATCH')))");
}
- protected void createSchema(final String schemaName) {
+ protected void createSourceSchema(final String schemaName) {
if (DatabaseTypeUtil.isPostgreSQL(databaseType)) {
- executeWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
+ sourceExecuteWithLog(String.format("CREATE SCHEMA IF NOT EXISTS %s", schemaName));
return;
}
if (DatabaseTypeUtil.isOpenGauss(databaseType)) {
try {
- executeWithLog(String.format("CREATE SCHEMA %s", schemaName));
+ sourceExecuteWithLog(String.format("CREATE SCHEMA %s", schemaName));
} catch (final BadSqlGrammarException ex) {
// only used for native mode.
if (ex.getCause() instanceof PSQLException && "42P06".equals(((PSQLException) ex.getCause()).getSQLState())) {
@@ -286,78 +259,90 @@ public abstract class BaseITCase {
}
}
- protected void executeWithLog(final Connection connection, final String sql) throws SQLException {
- log.info("connection execute:{}", sql);
- connection.createStatement().execute(sql);
- ThreadUtil.sleep(1, TimeUnit.SECONDS);
+ @SneakyThrows(SQLException.class)
+ protected void sourceExecuteWithLog(final String sql) {
+ log.info("source execute :{}", sql);
+ try (Connection connection = sourceDataSource.getConnection()) {
+ connection.createStatement().execute(sql);
+ }
}
- private void executeWithLog(final String sql, final int sleepSeconds) {
- log.info("jdbcTemplate execute:{}", sql);
- jdbcTemplate.execute(sql);
+ @SneakyThrows(SQLException.class)
+ protected void proxyExecuteWithLog(final String sql, final int sleepSeconds) {
+ log.info("proxy execute :{}", sql);
+ try (Connection connection = proxyDataSource.getConnection()) {
+ connection.createStatement().execute(sql);
+ }
ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
}
- protected void executeWithLog(final String sql) {
- executeWithLog(sql, 2);
+ protected void connectionExecuteWithLog(final Connection connection, final String sql) throws SQLException {
+ log.info("connection execute:{}", sql);
+ connection.createStatement().execute(sql);
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
}
protected List<Map<String, Object>> queryForListWithLog(final String sql) {
int retryNumber = 0;
while (retryNumber <= 3) {
- try {
- return jdbcTemplate.queryForList(sql);
- } catch (final DataAccessException ex) {
+ try (Connection connection = proxyDataSource.getConnection()) {
+ ResultSet resultSet = connection.createStatement().executeQuery(sql);
+ return resultSetToList(resultSet);
+ } catch (final SQLException ex) {
log.error("data access error", ex);
}
- ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
retryNumber++;
}
throw new RuntimeException("can't get result from proxy");
}
+ protected List<Map<String, Object>> resultSetToList(final ResultSet rs) throws SQLException {
+ ResultSetMetaData md = rs.getMetaData();
+ int columns = md.getColumnCount();
+ List<Map<String, Object>> results = new ArrayList<>();
+ while (rs.next()) {
+ Map<String, Object> row = new HashMap<>();
+ for (int i = 1; i <= columns; i++) {
+ row.put(md.getColumnLabel(i).toLowerCase(), rs.getObject(i));
+ }
+ results.add(row);
+ }
+ return results;
+ }
+
protected void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
setIncreaseTaskThread(new Thread(baseIncrementTask));
getIncreaseTaskThread().start();
}
- protected void stopScalingSourceWriting(final String jobId) {
- executeWithLog(String.format("STOP MIGRATION SOURCE WRITING '%s'", jobId));
- }
-
- protected void stopScaling(final String jobId) {
- executeWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
- }
-
- protected void startScaling(final String jobId) {
- executeWithLog(String.format("START MIGRATION '%s'", jobId), 10);
+ protected void stopMigration(final String jobId) {
+ proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
}
- protected void applyScaling(final String jobId) {
- assertBeforeApplyScalingMetadataCorrectly();
- executeWithLog(String.format("APPLY MIGRATION '%s'", jobId));
+ // TODO reopen later
+ protected void startMigrationByJob(final String jobId) {
+ proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
}
- protected void assertBeforeApplyScalingMetadataCorrectly() {
- List<Map<String, Object>> previewResults = queryForListWithLog("PREVIEW SELECT COUNT(1) FROM t_order");
- assertThat("data_source_name name not correct, it's effective early, search watcher failed get more info",
- previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet()), is(new HashSet<>(Arrays.asList("ds_0", "ds_1"))));
+ protected List<String> listJobId() {
+ List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+ return jobList.stream().map(a -> a.get("id").toString()).collect(Collectors.toList());
}
- protected String getScalingJobId() {
- List<Map<String, Object>> scalingListMap = queryForListWithLog("SHOW MIGRATION LIST");
- String jobId = scalingListMap.get(0).get("id").toString();
- log.info("jobId: {}", jobId);
- return jobId;
+ protected String getJobIdByTableName(final String tableName) {
+ List<Map<String, Object>> jobList = queryForListWithLog("SHOW MIGRATION LIST");
+ return jobList.stream().filter(a -> a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new RuntimeException("not find target table")).get("id").toString();
}
- protected void waitScalingFinished(final String jobId) throws InterruptedException {
+ @SneakyThrows(InterruptedException.class)
+ protected void waitMigrationFinished(final String jobId) {
if (null != increaseTaskThread) {
TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
}
log.info("jobId: {}", jobId);
- Set<String> actualStatus = null;
- for (int i = 0; i < 20; i++) {
+ Set<String> actualStatus;
+ for (int i = 0; i < 10; i++) {
List<Map<String, Object>> showScalingStatusResult = showScalingStatus(jobId);
log.info("show migration status result: {}", showScalingStatusResult);
actualStatus = showScalingStatusResult.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
@@ -368,14 +353,13 @@ public abstract class BaseITCase {
} else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
log.warn("one of the shardingItem was not started correctly");
}
- ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
- assertThat(actualStatus, is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
}
protected void assertGreaterThanInitTableInitRows(final int tableInitRows, final String schema) {
String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
- Map<String, Object> actual = jdbcTemplate.queryForMap(countSQL);
+ Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
assertTrue("actual count " + actual.get("count"), Integer.parseInt(actual.get("count").toString()) > tableInitRows);
}
@@ -392,7 +376,7 @@ public abstract class BaseITCase {
}
boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
log.info("second check job result: {}", secondCheckJobResult);
- stopScalingSourceWriting(jobId);
+ proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
List<Map<String, Object>> checkScalingResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='DATA_MATCH')", jobId));
log.info("checkScalingResults: {}", checkScalingResults);
for (Map<String, Object> entry : checkScalingResults) {
@@ -408,17 +392,10 @@ public abstract class BaseITCase {
return false;
}
int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
- if (incrementalIdleSeconds <= 3) {
+ if (incrementalIdleSeconds < 10) {
return false;
}
}
return true;
}
-
- protected void assertPreviewTableSuccess(final String tableName, final List<String> expect) {
- List<Map<String, Object>> actualResults = queryForListWithLog(String.format("PREVIEW SELECT COUNT(1) FROM %s", tableName));
- List<String> dataSourceNames = actualResults.stream().map(each -> String.valueOf(each.get("data_source_name"))).sorted().collect(Collectors.toList());
- Collections.sort(expect);
- assertThat(dataSourceNames, is(expect));
- }
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
similarity index 52%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
index 6cdbad8abf2..18a6ed2caa1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
@@ -27,35 +27,23 @@ import javax.xml.bind.annotation.XmlRootElement;
@Data
@XmlRootElement(name = "command")
@XmlAccessorType(XmlAccessType.FIELD)
-public final class CommonSQLCommand {
+public final class MigrationDistSQLCommand {
- @XmlElement(name = "create-database-sharding-algorithm")
- private String createDatabaseShardingAlgorithm;
+ @XmlElement(name = "create-target-order-table-rule")
+ private String createTargetOrderTableRule;
- @XmlElement(name = "create-order-sharding-algorithm")
- private String createOrderShardingAlgorithm;
+ @XmlElement(name = "create-target-order-item-table-rule")
+ private String createTargetOrderItemTableRule;
- @XmlElement(name = "create-order-item-sharding-algorithm")
- private String createOrderItemShardingAlgorithm;
+ @XmlElement(name = "add-migration-source-resource-template")
+ private String addMigrationSourceResourceTemplate;
- @XmlElement(name = "create-order-table-rule")
- private String createOrderTableRule;
+ @XmlElement(name = "add-migration-target-resource-template")
+ private String addMigrationTargetResourceTemplate;
- @XmlElement(name = "create-order-item-table-rule")
- private String createOrderItemTableRule;
+ @XmlElement(name = "migration-order-single-table")
+ private String migrationOrderSingleTable;
- @XmlElement(name = "alter-sharding-algorithm")
- private String alterShardingAlgorithm;
-
- @XmlElement(name = "alter-order-with-item-auto-table-rule")
- private String alterOrderWithItemAutoTableRule;
-
- @XmlElement(name = "alter-order-auto-table-rule")
- private String alterOrderAutoTableRule;
-
- @XmlElement(name = "source-add-resource-template")
- private String sourceAddResourceTemplate;
-
- @XmlElement(name = "target-add-resource-template")
- private String targetAddResourceTemplate;
+ @XmlElement(name = "migration-order-item-single-table")
+ private String migrationOrderItemSingleTable;
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
similarity index 68%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index b2dbaf06509..d7a8fa4f61e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -26,28 +26,27 @@ import org.apache.shardingsphere.integration.data.pipeline.env.enums.ScalingITEn
import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.springframework.jdbc.core.JdbcTemplate;
-import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import static org.junit.Assert.assertTrue;
-
/**
* General scaling test case, includes multiple cases.
*/
@Slf4j
@RunWith(Parameterized.class)
-public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
+public final class MySQLMigrationGeneralIT extends BaseExtraSQLITCase {
private final ScalingParameterized parameterized;
- public MySQLGeneralScalingIT(final ScalingParameterized parameterized) {
+ public MySQLMigrationGeneralIT(final ScalingParameterized parameterized) {
super(parameterized);
this.parameterized = parameterized;
log.info("parameterized:{}", parameterized);
@@ -67,35 +66,39 @@ public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
}
@Test
- public void assertManualScalingSuccess() throws InterruptedException {
- addSourceResource();
- initShardingAlgorithm();
- assertTrue(waitShardingAlgorithmEffect(15));
+ public void assertMigrationSuccess() {
createScalingRule();
- createOrderTableRule();
- createOrderItemTableRule();
- createNoUseTable();
- createOrderTable();
- createOrderItemTable();
+ createSourceOrderTable();
+ createSourceOrderItemTable();
+ addSourceResource();
+ addTargetResource();
+ createTargetOrderTableRule();
+ createTargetOrderItemTableRule();
SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
+ JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
for (int i = 0; i < TABLE_INIT_ROW_COUNT / 1000; i++) {
Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 1000);
- getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
- getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
+ jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
}
- addTargetResource();
- startIncrementTask(new MySQLIncrementTask(getJdbcTemplate(), keyGenerateAlgorithm, true, 20));
- executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
- String jobId = getScalingJobId();
- waitScalingFinished(jobId);
- stopScaling(jobId);
- getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status,t_json) VALUES (?, ?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), keyGenerateAlgorithm.generateKey(),
- 1, "afterStopScaling", "{}");
- startScaling(jobId);
+ startMigrationOrderItem();
+ checkOrderMigration(keyGenerateAlgorithm, jdbcTemplate);
+ checkOrderItemMigration();
+ }
+
+ private void checkOrderMigration(final KeyGenerateAlgorithm keyGenerateAlgorithm, final JdbcTemplate jdbcTemplate) {
+ startMigrationOrder();
+ startIncrementTask(new MySQLIncrementTask(jdbcTemplate, keyGenerateAlgorithm, true, 20));
+ String jobId = getJobIdByTableName("t_order");
+ waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
- applyScaling(jobId);
- assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
- assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", "ds_3", "ds_4"));
+ }
+
+ private void checkOrderItemMigration() {
+ startMigrationOrderItem();
+ String jobId = getJobIdByTableName("t_order_item");
+ waitMigrationFinished(jobId);
+ assertCheckScalingSuccess(jobId);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
similarity index 67%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index c4358a615ef..6cd69231d09 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -31,24 +31,22 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.springframework.jdbc.core.JdbcTemplate;
-import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import static org.junit.Assert.assertTrue;
-
/**
* PostgreSQL general scaling test case. include openGauss type, same process.
*/
@Slf4j
@RunWith(Parameterized.class)
-public final class PostgreSQLGeneralScalingIT extends BaseExtraSQLITCase {
+public final class PostgreSQLMigrationGeneralIT extends BaseExtraSQLITCase {
private final ScalingParameterized parameterized;
- public PostgreSQLGeneralScalingIT(final ScalingParameterized parameterized) {
+ public PostgreSQLMigrationGeneralIT(final ScalingParameterized parameterized) {
super(parameterized);
this.parameterized = parameterized;
log.info("parameterized:{}", parameterized);
@@ -70,36 +68,39 @@ public final class PostgreSQLGeneralScalingIT extends BaseExtraSQLITCase {
}
@Test
- public void assertManualScalingSuccess() throws InterruptedException {
- addSourceResource();
- initShardingAlgorithm();
- assertTrue(waitShardingAlgorithmEffect(15));
+ public void assertMigrationSuccess() {
createScalingRule();
- createSchema("test");
- createOrderTableRule();
- createOrderItemTableRule();
- createOrderTable();
- createOrderItemTable();
- // TODO wait kernel support create index if not exists
- // createTableIndexList("test");
- executeWithLog("COMMENT ON COLUMN test.t_order.user_id IS 'user id';");
+ createSourceSchema("test");
+ createSourceOrderTable();
+ createSourceOrderItemTable();
+ createSourceTableIndexList("test");
+ createSourceCommentOnList("test");
+ addSourceResource();
+ addTargetResource();
+ createTargetOrderTableRule();
+ createTargetOrderItemTableRule();
SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
- getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
- getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
- addTargetResource();
- startIncrementTask(new PostgreSQLIncrementTask(getJdbcTemplate(), new SnowflakeKeyGenerateAlgorithm(), "test", true, 20));
- executeWithLog(getCommonSQLCommand().getAlterOrderWithItemAutoTableRule());
- String jobId = getScalingJobId();
- waitScalingFinished(jobId);
- stopScaling(jobId);
- executeWithLog(String.format("INSERT INTO test.t_order (id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", keyGenerateAlgorithm.generateKey(), System.currentTimeMillis(),
- 1, "afterStopScaling"));
- startScaling(jobId);
+ JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
+ jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
+ jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
+ checkOrderMigration(jdbcTemplate);
+ checkOrderItemMigration();
+ }
+
+ private void checkOrderMigration(final JdbcTemplate jdbcTemplate) {
+ startMigrationOrder();
+ startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, "test", false, 20));
+ String jobId = getJobIdByTableName("t_order");
+ waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
- applyScaling(jobId);
- assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
- assertPreviewTableSuccess("t_order_item", Arrays.asList("ds_2", "ds_3", "ds_4"));
+ }
+
+ private void checkOrderItemMigration() {
+ startMigrationOrderItem();
+ String jobId = getJobIdByTableName("t_order_item");
+ waitMigrationFinished(jobId);
+ assertCheckScalingSuccess(jobId);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
similarity index 71%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 416f462d6f9..fbe0c01c3ce 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -30,20 +30,18 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import static org.junit.Assert.assertTrue;
-
@RunWith(Parameterized.class)
@Slf4j
-public class TextPrimaryKeyScalingIT extends BaseExtraSQLITCase {
+public class TextPrimaryKeyMigrationIT extends BaseExtraSQLITCase {
- public TextPrimaryKeyScalingIT(final ScalingParameterized parameterized) {
+ public TextPrimaryKeyMigrationIT(final ScalingParameterized parameterized) {
super(parameterized);
log.info("parameterized:{}", parameterized);
}
@@ -67,29 +65,32 @@ public class TextPrimaryKeyScalingIT extends BaseExtraSQLITCase {
}
@Test
- public void assertTextPrimaryKeyScalingSuccess() throws InterruptedException {
- addSourceResource();
- initShardingAlgorithm();
- assertTrue(waitShardingAlgorithmEffect(15));
- createScalingRule();
- createOrderTableRule();
- createOrderTable();
+ public void assertTextPrimaryMigrationSuccess() throws SQLException {
+ createSourceOrderTable();
batchInsertOrder();
+ createScalingRule();
+ addSourceResource();
addTargetResource();
- executeWithLog(getCommonSQLCommand().getAlterOrderAutoTableRule());
- String jobId = getScalingJobId();
- waitScalingFinished(jobId);
+ createTargetOrderTableRule();
+ startMigrationOrder();
+ String jobId = listJobId().get(0);
+ waitMigrationFinished(jobId);
+ stopMigration(jobId);
assertCheckScalingSuccess(jobId);
- applyScaling(jobId);
- assertPreviewTableSuccess("t_order", Arrays.asList("ds_2", "ds_3", "ds_4"));
}
- private void batchInsertOrder() {
+ private void batchInsertOrder() throws SQLException {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
- List<Object[]> orderData = new ArrayList<>(TABLE_INIT_ROW_COUNT);
- for (int i = 0; i < TABLE_INIT_ROW_COUNT; i++) {
- orderData.add(new Object[]{keyGenerateAlgorithm.generateKey(), ThreadLocalRandom.current().nextInt(0, 6), ThreadLocalRandom.current().nextInt(0, 6), "OK"});
+ try (Connection connection = getSourceDataSource().getConnection()) {
+ PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?,?,?,?)");
+ for (int i = 0; i < TABLE_INIT_ROW_COUNT; i++) {
+ preparedStatement.setObject(1, keyGenerateAlgorithm.generateKey());
+ preparedStatement.setObject(2, ThreadLocalRandom.current().nextInt(0, 6));
+ preparedStatement.setObject(3, ThreadLocalRandom.current().nextInt(0, 6));
+ preparedStatement.setObject(4, "OK");
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
}
- getJdbcTemplate().batchUpdate("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?,?,?,?)", orderData);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index f6faace0b69..14bebdc5c5f 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -79,7 +79,7 @@ public final class MySQLIncrementTask extends BaseIncrementTask {
private void updateOrderByPrimaryKey(final Object primaryKey) {
Object[] updateData = {"updated" + Instant.now().getEpochSecond(), ThreadLocalRandom.current().nextInt(0, 100), primaryKey};
jdbcTemplate.update("UPDATE t_order SET status = ?,t_unsigned_int = ? WHERE id = ?", updateData);
- jdbcTemplate.update("UPDATE t_order SET status = null,t_unsigned_int = 299,t_timestamp='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
+ jdbcTemplate.update("UPDATE t_order SET status = null,t_unsigned_int = 299,t_datetime='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
}
private void setNullToOrderFields(final Object primaryKey) {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index ff22ce7f4cd..352e5b00cc0 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -22,19 +22,21 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseIncrementTask;
import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.Instant;
+import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
@RequiredArgsConstructor
public final class PostgreSQLIncrementTask extends BaseIncrementTask {
- private final JdbcTemplate jdbcTemplate;
+ private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new SnowflakeKeyGenerateAlgorithm();
- private final KeyGenerateAlgorithm keyGenerateAlgorithm;
+ private final JdbcTemplate jdbcTemplate;
private final String schema;
@@ -42,6 +44,12 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private final int executeCountLimit;
+ static {
+ Properties props = new Properties();
+ props.setProperty("max-vibration-offset", "2");
+ KEY_GENERATE_ALGORITHM.init(props);
+ }
+
@Override
public void run() {
int executeCount = 0;
@@ -65,7 +73,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private Object insertOrder() {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
- Object[] orderInsertDate = new Object[]{keyGenerateAlgorithm.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
+ Object[] orderInsertDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
return orderInsertDate[0];
}
@@ -73,7 +81,7 @@ public final class PostgreSQLIncrementTask extends BaseIncrementTask {
private Object insertOrderItem() {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
- Object[] orderInsertItemDate = new Object[]{keyGenerateAlgorithm.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
+ Object[] orderInsertItemDate = new Object[]{KEY_GENERATE_ALGORITHM.generateKey(), ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)", schema), orderInsertItemDate);
return orderInsertItemDate[0];
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
similarity index 95%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
index 79cc4b7bc39..95c505b9484 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/DockerComposedContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/compose/MigrationComposedContainer.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.test.integration.env.runtime.DataSourceEnvironm
/**
* Composed container, include governance container and database container.
*/
-public final class DockerComposedContainer extends BaseComposedContainer {
+public final class MigrationComposedContainer extends BaseComposedContainer {
private final DatabaseType databaseType;
@@ -45,7 +45,7 @@ public final class DockerComposedContainer extends BaseComposedContainer {
@Getter
private final GovernanceContainer governanceContainer;
- public DockerComposedContainer(final DatabaseType databaseType, final String dockerImageName) {
+ public MigrationComposedContainer(final DatabaseType databaseType, final String dockerImageName) {
this.databaseType = databaseType;
governanceContainer = getContainers().registerContainer(new ZookeeperContainer());
storageContainer = getContainers().registerContainer((DockerStorageContainer) StorageContainerFactory.newInstance(databaseType, dockerImageName,
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
index 59e7f49af11..a87773aad14 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/watcher/ScalingWatcher.java
@@ -21,14 +21,13 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.BaseComposedContainer;
-import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.DockerComposedContainer;
+import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.MigrationComposedContainer;
import org.apache.shardingsphere.integration.data.pipeline.framework.container.compose.NativeComposedContainer;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.CuratorZookeeperRepository;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
-import org.springframework.jdbc.core.JdbcTemplate;
import java.util.LinkedHashMap;
import java.util.List;
@@ -41,8 +40,6 @@ public class ScalingWatcher extends TestWatcher {
private final BaseComposedContainer composedContainer;
- private final JdbcTemplate jdbcTemplate;
-
@Override
protected void failed(final Throwable e, final Description description) {
if (composedContainer instanceof NativeComposedContainer) {
@@ -50,17 +47,14 @@ public class ScalingWatcher extends TestWatcher {
return;
}
outputZookeeperData();
- List<Map<String, Object>> previewList = jdbcTemplate.queryForList("preview select * from t_order");
- List<Map<String, Object>> shardingAlgorithms = jdbcTemplate.queryForList("SHOW SHARDING ALGORITHMS");
- log.warn("watcher failed, preview:{}, shardingAlgorithms:{}", previewList, shardingAlgorithms);
}
private void outputZookeeperData() {
- DockerComposedContainer dockerComposedContainer = (DockerComposedContainer) composedContainer;
- DatabaseType databaseType = dockerComposedContainer.getStorageContainer().getDatabaseType();
+ MigrationComposedContainer migrationComposedContainer = (MigrationComposedContainer) composedContainer;
+ DatabaseType databaseType = migrationComposedContainer.getStorageContainer().getDatabaseType();
String namespace = "it_db_" + databaseType.getType().toLowerCase();
ClusterPersistRepositoryConfiguration config = new ClusterPersistRepositoryConfiguration("ZooKeeper", namespace,
- dockerComposedContainer.getGovernanceContainer().getServerLists(), new Properties());
+ migrationComposedContainer.getGovernanceContainer().getServerLists(), new Properties());
ClusterPersistRepository zookeeperRepository = new CuratorZookeeperRepository();
zookeeperRepository.init(config);
List<String> childrenKeys = zookeeperRepository.getChildrenKeys("/");
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
index 4b2d51b76ae..1ba93faaaf2 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
@@ -16,80 +16,15 @@
-->
<command>
- <create-database-sharding-algorithm>
- CREATE SHARDING ALGORITHM database_inline (
- TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="ds_${user_id % 2}")))
- </create-database-sharding-algorithm>
-
- <create-order-sharding-algorithm>
- CREATE SHARDING ALGORITHM t_order_inline (
- TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="t_order_${order_id % 2}")))
- </create-order-sharding-algorithm>
-
- <create-order-item-sharding-algorithm>
- CREATE SHARDING ALGORITHM t_order_item_inline (
- TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="t_order_item_${order_id % 2}")))
- </create-order-item-sharding-algorithm>
-
- <create-order-table-rule>
- CREATE SHARDING TABLE RULE t_order(
- RESOURCES(ds_0,ds_1),
- SHARDING_COLUMN=order_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- )
- </create-order-table-rule>
-
- <create-order-item-table-rule>
- CREATE SHARDING TABLE RULE t_order_item(
- RESOURCES(ds_0,ds_1),
- SHARDING_COLUMN=order_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- );
- </create-order-item-table-rule>
-
- <alter-sharding-algorithm>
- ALTER SHARDING ALGORITHM database_inline
- (TYPE(NAME="INLINE",PROPERTIES("algorithm-expression"="ds_${user_id % 3 + 2}")))
- </alter-sharding-algorithm>
-
- <alter-order-with-item-auto-table-rule>
- ALTER SHARDING TABLE RULE t_order(
- RESOURCES(ds_2, ds_3, ds_4),
- SHARDING_COLUMN=order_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- ),t_order_item(
- RESOURCES(ds_2, ds_3, ds_4),
- SHARDING_COLUMN=order_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- )
- </alter-order-with-item-auto-table-rule>
-
- <alter-order-auto-table-rule>
- ALTER SHARDING TABLE RULE t_order(
- RESOURCES(ds_2, ds_3, ds_4),
- SHARDING_COLUMN=order_id,
- TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
- KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- )
- </alter-order-auto-table-rule>
-
- <source-add-resource-template>
- ADD RESOURCE ds_0 (
+ <add-migration-source-resource-template>
+ ADD MIGRATION SOURCE RESOURCE ds_0 (
URL="${ds0}",
USER="${user}",
PASSWORD="${password}"
- ), ds_1 (
- URL="${ds1}",
- USER="${user}",
- PASSWORD="${password}"
- )
- </source-add-resource-template>
+ );
+ </add-migration-source-resource-template>
- <target-add-resource-template>
+ <add-migration-target-resource-template>
ADD RESOURCE ds_2 (
URL="${ds2}",
USER="${user}",
@@ -103,5 +38,31 @@
USER="${user}",
PASSWORD="${password}"
)
- </target-add-resource-template>
+ </add-migration-target-resource-template>
+
+ <create-target-order-table-rule>
+ CREATE SHARDING TABLE RULE t_order(
+ RESOURCES(ds_2,ds_3,ds_4),
+ SHARDING_COLUMN=order_id,
+ TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
+ KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+ )
+ </create-target-order-table-rule>
+
+ <create-target-order-item-table-rule>
+ CREATE SHARDING TABLE RULE t_order_item(
+ RESOURCES(ds_2,ds_3,ds_4),
+ SHARDING_COLUMN=order_id,
+ TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
+ KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+ );
+ </create-target-order-item-table-rule>
+
+ <migration-order-single-table>
+ MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
+ </migration-order-single-table>
+
+ <migration-order-item-single-table>
+ MIGRATE TABLE ds_0.t_order_item INTO sharding_db.t_order_item;
+ </migration-order-item-single-table>
</command>
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index 51baa22fcda..13e3417ebb2 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -26,6 +26,7 @@
<logger name="org.apache.shardingsphere.test.integration.env.container.atomic.DockerITContainer" level="WARN" />
<logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
<logger name="com.github.dockerjava" level="WARN"/>
+ <logger name="org.apache.zookeeper.ZooKeeper" level="OFF"/>
<root level="INFO">
<appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 2797c0d23b1..9fbaeaf1a07 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -219,10 +219,9 @@ public final class MigrationJobAPIImplTest {
jobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
- jobAPI.switchClusterConfiguration(jobId.get());
Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobId.get());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
- assertSame(entry.getValue().getStatus(), JobStatus.FINISHED);
+ assertSame(entry.getValue().getStatus(), JobStatus.EXECUTE_INVENTORY_TASK);
}
}
@@ -251,8 +250,8 @@ public final class MigrationJobAPIImplTest {
Connection connection = pipelineDataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT(11))");
+ statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 0), (999, 15)");
}
}
@@ -280,4 +279,10 @@ public final class MigrationJobAPIImplTest {
Map<String, DataSourceProperties> actual = persistService.load(JobType.MIGRATION);
assertTrue(actual.containsKey("ds_0"));
}
+
+ @Test
+ public void assertCreateJobConfig() {
+ // CreateMigrationJobParameter parameter = new CreateMigrationJobParameter();
+ // jobAPI.createJobConfig(parameter);
+ }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 4adbd5ddff9..2f0e02f975a 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -64,8 +64,8 @@ public final class DataConsistencyCheckerTest {
Connection connection = new DefaultPipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT(11))");
+ statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 1), (999, 10)");
}
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
index 04b6d27095a..46622357323 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePositionInitializer.java
@@ -25,7 +25,7 @@ import javax.sql.DataSource;
public final class FixturePositionInitializer implements PositionInitializer {
@Override
- public PlaceholderPosition init(final DataSource dataSource) {
+ public PlaceholderPosition init(final DataSource dataSource, final String slotNameSuffix) {
return new PlaceholderPosition();
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index fd0d8e0ee05..e5818fb9667 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -57,7 +57,7 @@ public final class IncrementalTaskTest {
@Test
public void assertStart() {
incrementalTask.start();
- assertThat(incrementalTask.getTaskId(), is("ds_0"));
+ assertThat(incrementalTask.getTaskId(), is("standard_0"));
assertThat(incrementalTask.getTaskProgress().getPosition(), instanceOf(PlaceholderPosition.class));
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 3c8cd165b79..a06e5a3b8f6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -28,10 +28,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-
-import java.util.Collections;
-import java.util.concurrent.ThreadLocalRandom;
/**
* Job configuration builder.
@@ -46,15 +42,14 @@ public final class JobConfigurationBuilder {
*/
public static MigrationJobConfiguration createJobConfiguration() {
YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration();
- result.setDatabaseName("logic_db");
- result.setAlteredRuleYamlClassNameTablesMap(Collections.singletonMap(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
- int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
- result.setActiveVersion(activeVersion);
- result.setNewVersion(activeVersion + 1);
+ result.setTargetDatabaseName("logic_db");
+ result.setSourceDataSourceName("standard_0");
// TODO add autoTables in config file
- result.setSource(createYamlPipelineDataSourceConfiguration(
- new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
- result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
+ result.setSource(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("migration_standard_jdbc_source.yaml"))));
+ result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
+ ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
+ result.setSourceTableName("t_order");
+ result.setTargetTableName("t_order");
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
new file mode 100644
index 00000000000..784ee92542b
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_sharding_sphere_jdbc_target.yaml
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+dataSources:
+ ds_1:
+ dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+ jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+ username: root
+ password: root
+ ds_2:
+ dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+ jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+ username: root
+ password: root
+rules:
+ - !SHARDING
+ defaultDatabaseStrategy:
+ standard:
+ shardingAlgorithmName: default_db_inline
+ shardingColumn: user_id
+ tables:
+ t_order:
+ actualDataNodes: ds_$->{1..2}.t_order_$->{0..1}
+ keyGenerateStrategy:
+ column: order_id
+ keyGeneratorName: snowflake
+ tableStrategy:
+ standard:
+ shardingAlgorithmName: t_order_tbl_inline
+ shardingColumn: order_id
+ shardingAlgorithms:
+ default_db_inline:
+ type: INLINE
+ props:
+ algorithm-expression: ds_$->{user_id % 2 + 1}
+ t_order_tbl_inline:
+ type: INLINE
+ props:
+ algorithm-expression: t_order_$->{order_id % 2}
+
+ keyGenerators:
+ snowflake:
+ type: SNOWFLAKE
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml
new file mode 100644
index 00000000000..ff51c87dc8e
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/migration_standard_jdbc_source.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+jdbcUrl: jdbc:h2:mem:standard_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
+username: root
+password: root
+dataSourceClassName: com.zaxxer.hikari.HikariDataSource
+minimumIdle: 1
+minPoolSize: 1
+maxPoolSize: 50
+maximumPoolSize: 50
+readOnly: false
+idleTimeout: 60000
+connectionTimeout: 30000
+maxLifetime: 1800000