You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/29 10:12:09 UTC
[shardingsphere] branch master updated: Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f16c9dc0261 Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)
f16c9dc0261 is described below
commit f16c9dc0261f50e40c31966b13c42c6bb342934d
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Apr 29 18:11:54 2022 +0800
Refactor JobConfiguration, prepare for different types of jobs reuse and extension (#17205)
* Rename JobConfiguration to RuleAlteredJobConfiguration
* Rename jobConfiguration to jobConfig
* Refactor databaseName from workflowConfig to jobConfig
* Refactor jobId from handleConfig to jobConfig
* Refactor jobShardingItem from handleConfig to jobConfig
* Refactor workflowConfig into jobConfig
* Refactor pipelineConfig into jobConfig
* Refactor handleConfig into jobConfig
---
...hardingRuleAlteredJobConfigurationPreparer.java | 52 +++---
...ingRuleAlteredJobConfigurationPreparerTest.java | 98 -----------
.../yaml/scaling/prepare/alter_rule_source.yaml | 88 ----------
.../prepare/auto_table_alter_rule_target.yaml | 94 -----------
.../scaling/prepare/table_alter_rule_target.yaml | 94 -----------
.../data/pipeline/api/RuleAlteredJobAPI.java | 14 +-
.../PipelineJobConfiguration.java} | 35 ++--
.../config/rulealtered/HandleConfiguration.java | 94 -----------
.../api/config/rulealtered/JobConfiguration.java | 97 -----------
.../config/rulealtered/PipelineConfiguration.java | 59 -------
.../rulealtered/RuleAlteredJobConfiguration.java | 184 +++++++++++++++++++++
.../api/config/rulealtered/TaskConfiguration.java | 2 +-
.../config/rulealtered/WorkflowConfiguration.java | 53 ------
.../RuleAlteredJobConfigurationPreparer.java | 17 +-
...t.java => RuleAlteredJobConfigurationTest.java} | 30 +---
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 2 +-
.../core/api/impl/RuleAlteredJobAPIImpl.java | 78 +++++----
.../check/consistency/DataConsistencyChecker.java | 30 ++--
.../pipeline/core/execute/PipelineJobExecutor.java | 13 +-
.../data/pipeline/core/job/FinishedCheckJob.java | 14 +-
.../datasource/AbstractDataSourcePreparer.java | 10 +-
.../datasource/PrepareTargetTablesParameter.java | 4 +-
.../scenario/rulealtered/RuleAlteredJob.java | 8 +-
.../rulealtered/RuleAlteredJobContext.java | 12 +-
.../rulealtered/RuleAlteredJobPreparer.java | 30 ++--
.../rulealtered/RuleAlteredJobScheduler.java | 4 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 61 +++----
.../rulealtered/prepare/InventoryTaskSplitter.java | 8 +-
.../job/environment/ScalingEnvironmentManager.java | 10 +-
.../datasource/MySQLDataSourcePreparer.java | 8 +-
.../datasource/MySQLDataSourcePreparerTest.java | 18 +-
.../datasource/OpenGaussDataSourcePreparer.java | 4 +-
.../datasource/PostgreSQLDataSourcePreparer.java | 8 +-
.../data/pipeline/env/ITEnvironmentContext.java | 11 +-
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 43 +++--
.../consistency/DataConsistencyCheckerTest.java | 4 +-
.../datasource/PipelineDataSourceManagerTest.java | 10 +-
.../core/util/JobConfigurationBuilder.java | 22 +--
.../scenario/rulealtered/RuleAlteredJobTest.java | 8 +-
.../rulealtered/RuleAlteredJobWorkerTest.java | 15 +-
.../prepare/InventoryTaskSplitterTest.java | 4 +-
41 files changed, 461 insertions(+), 989 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
index 0c47070e413..cc84a00639c 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -22,11 +22,9 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -68,23 +66,21 @@ import java.util.Set;
public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAlteredJobConfigurationPreparer {
@Override
- public HandleConfiguration createHandleConfiguration(final PipelineConfiguration pipelineConfig, final WorkflowConfiguration workflowConfig) {
- HandleConfiguration result = new HandleConfiguration();
- Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(pipelineConfig, workflowConfig);
- result.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
- result.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
- result.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
- return result;
+ public void extendJobConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ Map<String, List<DataNode>> shouldScalingActualDataNodes = getShouldScalingActualDataNodes(jobConfig);
+ jobConfig.setJobShardingDataNodes(getJobShardingDataNodes(shouldScalingActualDataNodes));
+ jobConfig.setLogicTables(getLogicTables(shouldScalingActualDataNodes.keySet()));
+ jobConfig.setTablesFirstDataNodes(getTablesFirstDataNodes(shouldScalingActualDataNodes));
}
- private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final PipelineConfiguration pipelineConfig, final WorkflowConfiguration workflowConfig) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+ private static Map<String, List<DataNode>> getShouldScalingActualDataNodes(final RuleAlteredJobConfiguration jobConfig) {
+ PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
ShardingSpherePipelineDataSourceConfiguration source = (ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig;
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(source.getRootConfig().getRules());
ShardingRule shardingRule = new ShardingRule(sourceRuleConfig, source.getRootConfig().getDataSources().keySet());
Map<String, TableRule> tableRules = shardingRule.getTableRules();
Map<String, List<DataNode>> result = new LinkedHashMap<>();
- Set<String> reShardNeededTables = new HashSet<>(workflowConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
+ Set<String> reShardNeededTables = new HashSet<>(jobConfig.getAlteredRuleYamlClassNameTablesMap().get(YamlShardingRuleConfiguration.class.getName()));
for (Entry<String, TableRule> entry : tableRules.entrySet()) {
if (reShardNeededTables.contains(entry.getKey())) {
result.put(entry.getKey(), entry.getValue().getActualDataNodes());
@@ -129,13 +125,13 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return new JobDataNodeLine(dataNodeEntries).marshal();
}
+ // TODO use jobConfig as parameter, jobShardingItem
@Override
- public TaskConfiguration createTaskConfiguration(final PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig,
- final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
- ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(pipelineConfig);
+ public TaskConfiguration createTaskConfiguration(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+ ShardingSpherePipelineDataSourceConfiguration sourceConfig = getSourceConfiguration(jobConfig);
ShardingRuleConfiguration sourceRuleConfig = ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
Map<String, DataSourceProperties> dataSourcePropsMap = new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
- JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(handleConfig.getJobShardingDataNodes().get(handleConfig.getJobShardingItem()));
+ JobDataNodeLine dataNodeLine = JobDataNodeLine.unmarshal(jobConfig.getJobShardingDataNodes().get(jobConfig.getJobShardingItem()));
String dataSourceName = dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
Map<String, String> tableMap = new LinkedHashMap<>();
for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
@@ -144,20 +140,20 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
}
}
DumperConfiguration dumperConfig = createDumperConfiguration(dataSourceName, dataSourcePropsMap.get(dataSourceName).getAllLocalProperties(), tableMap);
- Optional<ShardingRuleConfiguration> targetRuleConfigOptional = getTargetRuleConfiguration(pipelineConfig);
- Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfigOptional.orElse(sourceRuleConfig), new HashSet<>(handleConfig.splitLogicTableNames()));
- ImporterConfiguration importerConfig = createImporterConfiguration(pipelineConfig, handleConfig, onRuleAlteredActionConfig, shardingColumnsMap);
- TaskConfiguration result = new TaskConfiguration(handleConfig, dumperConfig, importerConfig);
+ Optional<ShardingRuleConfiguration> targetRuleConfigOptional = getTargetRuleConfiguration(jobConfig);
+ Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfigOptional.orElse(sourceRuleConfig), new HashSet<>(jobConfig.splitLogicTableNames()));
+ ImporterConfiguration importerConfig = createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, shardingColumnsMap);
+ TaskConfiguration result = new TaskConfiguration(jobConfig, dumperConfig, importerConfig);
log.info("createTaskConfiguration, dataSourceName={}, result={}", dataSourceName, result);
return result;
}
- private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final PipelineConfiguration pipelineConfig) {
- return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+ private static ShardingSpherePipelineDataSourceConfiguration getSourceConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ return (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
}
- private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final PipelineConfiguration pipelineConfig) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+ private static Optional<ShardingRuleConfiguration> getTargetRuleConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
if (!(targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration)) {
return Optional.empty();
}
@@ -207,11 +203,11 @@ public final class ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
- private static ImporterConfiguration createImporterConfiguration(final PipelineConfiguration pipelineConfig, final HandleConfiguration handleConfig,
+ private static ImporterConfiguration createImporterConfiguration(final RuleAlteredJobConfiguration jobConfig,
final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig, final Map<String, Set<String>> shardingColumnsMap) {
- PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+ PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
- int retryTimes = handleConfig.getRetryTimes();
+ int retryTimes = jobConfig.getRetryTimes();
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, batchSize, retryTimes);
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java
deleted file mode 100644
index a0109d36bb8..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/schedule/ShardingRuleAlteredJobConfigurationPreparerTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.sharding.schedule;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
-import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration.OutputConfiguration;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public final class ShardingRuleAlteredJobConfigurationPreparerTest {
-
- @Mock
- private OnRuleAlteredActionConfiguration mockOnRuleAlteredActionConfiguration;
-
- private WorkflowConfiguration workflowConfiguration;
-
- @Before
- public void setUp() {
- workflowConfiguration = new WorkflowConfiguration("logic_db", ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(),
- Collections.singletonList("t_order")), 0, 1);
- when(mockOnRuleAlteredActionConfiguration.getOutput()).thenReturn(new OutputConfiguration(5, 1000, null));
- }
-
- @Test
- public void assertAutoTableRuleCreateTaskConfiguration() throws IOException {
- PipelineConfiguration pipelineConfiguration = new PipelineConfiguration();
- setPipelineConfigurationSource(pipelineConfiguration);
- URL targetUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/auto_table_alter_rule_target.yaml");
- assertNotNull(targetUrl);
- YamlPipelineDataSourceConfiguration target = YamlEngine.unmarshal(new File(targetUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
- pipelineConfiguration.setTarget(target);
- JobConfiguration jobConfiguration = new JobConfiguration(workflowConfiguration, pipelineConfiguration);
- jobConfiguration.buildHandleConfig();
- ShardingRuleAlteredJobConfigurationPreparer preparer = new ShardingRuleAlteredJobConfigurationPreparer();
- TaskConfiguration taskConfiguration = preparer.createTaskConfiguration(pipelineConfiguration, jobConfiguration.getHandleConfig(), mockOnRuleAlteredActionConfiguration);
- assertEquals(taskConfiguration.getHandleConfig().getLogicTables(), "t_order");
- }
-
- @Test
- public void assertTableRuleCreateTaskConfiguration() throws IOException {
- PipelineConfiguration pipelineConfiguration = new PipelineConfiguration();
- setPipelineConfigurationSource(pipelineConfiguration);
- URL targetUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/table_alter_rule_target.yaml");
- assertNotNull(targetUrl);
- YamlPipelineDataSourceConfiguration target = YamlEngine.unmarshal(new File(targetUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
- pipelineConfiguration.setTarget(target);
- JobConfiguration jobConfiguration = new JobConfiguration(workflowConfiguration, pipelineConfiguration);
- jobConfiguration.buildHandleConfig();
- ShardingRuleAlteredJobConfigurationPreparer preparer = new ShardingRuleAlteredJobConfigurationPreparer();
- TaskConfiguration taskConfiguration = preparer.createTaskConfiguration(pipelineConfiguration, jobConfiguration.getHandleConfig(), mockOnRuleAlteredActionConfiguration);
- assertThat(taskConfiguration.getHandleConfig().getLogicTables(), is("t_order"));
- }
-
- private void setPipelineConfigurationSource(final PipelineConfiguration pipelineConfiguration) throws IOException {
- URL sourceUrl = ShardingRuleAlteredJobConfigurationPreparerTest.class.getClassLoader().getResource("yaml/scaling/prepare/alter_rule_source.yaml");
- assertNotNull(sourceUrl);
- YamlPipelineDataSourceConfiguration source = YamlEngine.unmarshal(new File(sourceUrl.getFile()), YamlPipelineDataSourceConfiguration.class);
- pipelineConfiguration.setSource(source);
- }
-}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml
deleted file mode 100644
index ba874588719..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/alter_rule_source.yaml
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
- dataSources:
- ds_0:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_1:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_2:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_3:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_4:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- rules:
- - !SHARDING
- defaultDatabaseStrategy:
- standard:
- shardingAlgorithmName: database_inline
- shardingColumn: user_id
- defaultTableStrategy:
- none: ''
- keyGenerators:
- snowflake:
- type: SNOWFLAKE
- scaling:
- default_scaling:
- completionDetector:
- props:
- incremental-task-idle-minute-threshold: 30
- type: IDLE
- dataConsistencyChecker:
- props:
- chunk-size: 1000
- type: DATA_MATCH
- input:
- batchSize: 1000
- workerThread: 40
- output:
- batchSize: 1000
- workerThread: 40
- streamChannel:
- props:
- block-queue-size: 10000
- type: MEMORY
- scalingName: default_scaling
- shardingAlgorithms:
- database_inline:
- props:
- algorithm-expression: ds_${user_id % 2}
- type: INLINE
- t_order_inline:
- props:
- algorithm-expression: t_order_${order_id % 2}
- type: INLINE
- tables:
- t_order:
- actualDataNodes: ds_${0..1}.t_order_${0..1}
- keyGenerateStrategy:
- column: order_id
- keyGeneratorName: snowflake
- logicTable: t_order
- tableStrategy:
- standard:
- shardingAlgorithmName: t_order_inline
- shardingColumn: order_id
- schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml
deleted file mode 100644
index b1bfb8e7ebc..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/auto_table_alter_rule_target.yaml
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
- dataSources:
- ds_0:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_1:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_2:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_3:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_4:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- rules:
- - !SHARDING
- autoTables:
- t_order:
- actualDataSources: ds_2,ds_3,ds_4
- keyGenerateStrategy:
- column: order_id
- keyGeneratorName: t_order_snowflake
- logicTable: t_order
- shardingStrategy:
- standard:
- shardingAlgorithmName: t_order_hash_mod
- shardingColumn: order_id
- defaultDatabaseStrategy:
- standard:
- shardingAlgorithmName: database_inline
- shardingColumn: user_id
- defaultTableStrategy:
- none: ''
- keyGenerators:
- snowflake:
- type: SNOWFLAKE
- t_order_snowflake:
- type: snowflake
- scaling:
- default_scaling:
- completionDetector:
- props:
- incremental-task-idle-minute-threshold: 30
- type: IDLE
- dataConsistencyChecker:
- props:
- chunk-size: 1000
- type: DATA_MATCH
- input:
- batchSize: 1000
- workerThread: 40
- output:
- batchSize: 1000
- workerThread: 40
- streamChannel:
- props:
- block-queue-size: 10000
- type: MEMORY
- scalingName: default_scaling
- shardingAlgorithms:
- database_inline:
- props:
- algorithm-expression: ds_${user_id % 2}
- type: INLINE
- t_order_inline:
- props:
- algorithm-expression: t_order_${order_id % 2}
- type: INLINE
- t_order_hash_mod:
- props:
- sharding-count: '6'
- type: hash_mod
- schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml
deleted file mode 100644
index 9856cced6ef..00000000000
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/resources/yaml/scaling/prepare/table_alter_rule_target.yaml
+++ /dev/null
@@ -1,94 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-parameter: |
- dataSources:
- ds_0:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_1:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_1;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_2:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_3:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_3;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- ds_4:
- dataSourceClassName: org.apache.shardingsphere.test.mock.MockedDataSource
- jdbcUrl: jdbc:h2:mem:test_ds_4;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL
- rules:
- - !SHARDING
- defaultDatabaseStrategy:
- standard:
- shardingAlgorithmName: database_inline
- shardingColumn: user_id
- defaultTableStrategy:
- none: ''
- keyGenerators:
- snowflake:
- type: SNOWFLAKE
- t_order_snowflake:
- type: snowflake
- scaling:
- default_scaling:
- completionDetector:
- props:
- incremental-task-idle-minute-threshold: 30
- type: IDLE
- dataConsistencyChecker:
- props:
- chunk-size: 1000
- type: DATA_MATCH
- input:
- batchSize: 1000
- workerThread: 40
- output:
- batchSize: 1000
- workerThread: 40
- streamChannel:
- props:
- block-queue-size: 10000
- type: MEMORY
- scalingName: default_scaling
- shardingAlgorithms:
- database_inline:
- props:
- algorithm-expression: ds_${user_id % 2}
- type: INLINE
- t_order_inline:
- props:
- algorithm-expression: t_order_${order_id % 2}
- type: INLINE
- tables:
- t_order:
- actualDataNodes: ds_${2..4}.t_order_${0..1}
- databaseStrategy:
- standard:
- shardingAlgorithmName: database_inline
- shardingColumn: user_id
- keyGenerateStrategy:
- column: order_id
- keyGeneratorName: t_order_snowflake
- logicTable: t_order
- tableStrategy:
- standard:
- shardingAlgorithmName: t_order_inline
- shardingColumn: order_id
- schemaName: sharding_db
-type: ShardingSphereJDBC
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 56163342135..3f6774a65d6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
@@ -47,7 +47,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* @param jobConfig job config
* @return job id
*/
- Optional<String> start(JobConfiguration jobConfig);
+ Optional<String> start(RuleAlteredJobConfiguration jobConfig);
/**
* Get job progress.
@@ -63,7 +63,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* @param jobConfig job configuration
* @return each sharding item progress
*/
- Map<Integer, JobProgress> getProgress(JobConfiguration jobConfig);
+ Map<Integer, JobProgress> getProgress(RuleAlteredJobConfiguration jobConfig);
/**
* Stop cluster writing.
@@ -116,7 +116,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* @param jobConfig job configuration
* @return data consistency check needed or not
*/
- boolean isDataConsistencyCheckNeeded(JobConfiguration jobConfig);
+ boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration jobConfig);
/**
* Do data consistency check.
@@ -132,7 +132,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* @param jobConfig job configuration
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(JobConfiguration jobConfig);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
/**
* Do data consistency check.
@@ -164,7 +164,7 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
*
* @param jobConfig job configuration
*/
- void switchClusterConfiguration(JobConfiguration jobConfig);
+ void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
/**
* Reset scaling job.
@@ -179,5 +179,5 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* @param jobId job id
* @return job configuration
*/
- JobConfiguration getJobConfig(String jobId);
+ RuleAlteredJobConfiguration getJobConfig(String jobId);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
similarity index 61%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
index ad8687c4818..932fcf17537 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/PipelineJobConfiguration.java
@@ -15,24 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+package org.apache.shardingsphere.data.pipeline.api.config.job;
/**
- * Task configuration.
+ * Pipeline job configuration.
*/
-@Getter
-@RequiredArgsConstructor
-@ToString
-public final class TaskConfiguration {
+public interface PipelineJobConfiguration {
- private final HandleConfiguration handleConfig;
+ /**
+ * Get job id.
+ *
+ * @return job id
+ */
+ String getJobId();
- private final DumperConfiguration dumperConfig;
+ /**
+ * Get database name.
+ *
+ * @return database name
+ */
+ String getDatabaseName();
- private final ImporterConfiguration importerConfig;
+ /**
+ * Get job sharding item.
+ *
+ * @return job sharding item
+ */
+ Integer getJobShardingItem();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
deleted file mode 100644
index 6762e690b32..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfiguration.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Splitter;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-import java.util.List;
-
-/**
- * Handle configuration.
- */
-@NoArgsConstructor
-@Getter
-@Setter
-@ToString
-// TODO rename
-public final class HandleConfiguration {
-
- private String jobId;
-
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- /**
- * Collection of each logic table's first data node.
- * <p>
- * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and <pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>,
- * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
- * </p>
- */
- private String tablesFirstDataNodes;
-
- private List<String> jobShardingDataNodes;
-
- private String logicTables;
-
- /**
- * Job sharding item.
- */
- private Integer jobShardingItem;
-
- private int shardingSize = 1000 * 10000;
-
- private String sourceDatabaseType;
-
- private String targetDatabaseType;
-
- /**
- * Get job sharding count.
- *
- * @return job sharding count
- */
- public int getJobShardingCount() {
- return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
- }
-
- /**
- * Split {@linkplain #logicTables} to logic table names.
- *
- * @return logic table names
- */
- public List<String> splitLogicTableNames() {
- return Splitter.on(',').splitToList(logicTables);
- }
-
- /**
- * Get job ID digest.
- *
- * @return job ID digest
- */
- public String getJobIdDigest() {
- return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
deleted file mode 100644
index 310f35471fa..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/JobConfiguration.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Strings;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
-import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
-
-import java.util.Collections;
-
-/**
- * Scaling job configuration.
- */
-@NoArgsConstructor
-@AllArgsConstructor
-@Getter
-@Setter
-@Slf4j
-// TODO share for totally new scenario
-public final class JobConfiguration {
-
- private WorkflowConfiguration workflowConfig;
-
- private PipelineConfiguration pipelineConfig;
-
- private HandleConfiguration handleConfig;
-
- public JobConfiguration(final WorkflowConfiguration workflowConfig, final PipelineConfiguration pipelineConfig) {
- this.workflowConfig = workflowConfig;
- this.pipelineConfig = pipelineConfig;
- }
-
- /**
- * Build handle configuration.
- */
- public void buildHandleConfig() {
- PipelineConfiguration pipelineConfig = getPipelineConfig();
- HandleConfiguration handleConfig = getHandleConfig();
- if (null == handleConfig || null == handleConfig.getJobShardingDataNodes()) {
- handleConfig = RuleAlteredJobConfigurationPreparerFactory.newInstance().createHandleConfiguration(pipelineConfig, getWorkflowConfig());
- this.handleConfig = handleConfig;
- }
- if (null == handleConfig.getJobId()) {
- handleConfig.setJobId(generateJobId());
- }
- if (Strings.isNullOrEmpty(handleConfig.getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
- handleConfig.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getName());
- }
- if (Strings.isNullOrEmpty(handleConfig.getTargetDatabaseType())) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
- handleConfig.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getName());
- }
- if (null == handleConfig.getJobShardingItem()) {
- handleConfig.setJobShardingItem(0);
- }
- }
-
- private String generateJobId() {
- RuleAlteredJobId jobId = new RuleAlteredJobId();
- // TODO type, subTypes
- jobId.setType(JobType.RULE_ALTERED.getValue());
- jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
- jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
- WorkflowConfiguration workflowConfig = getWorkflowConfig();
- jobId.setCurrentMetadataVersion(workflowConfig.getActiveVersion());
- jobId.setNewMetadataVersion(workflowConfig.getNewVersion());
- jobId.setDatabaseName(workflowConfig.getDatabaseName());
- return jobId.marshal();
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
deleted file mode 100644
index 0ecb924a6f0..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/PipelineConfiguration.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import com.google.common.base.Preconditions;
-import lombok.Getter;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-
-/**
- * Pipeline configuration.
- */
-@Getter
-public final class PipelineConfiguration {
-
- private YamlPipelineDataSourceConfiguration source;
-
- private YamlPipelineDataSourceConfiguration target;
-
- /**
- * Set source.
- *
- * @param source source configuration
- */
- public void setSource(final YamlPipelineDataSourceConfiguration source) {
- checkParameters(source);
- this.source = source;
- }
-
- /**
- * Set target.
- *
- * @param target target configuration
- */
- public void setTarget(final YamlPipelineDataSourceConfiguration target) {
- checkParameters(target);
- this.target = target;
- }
-
- private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
- Preconditions.checkNotNull(yamlConfig);
- Preconditions.checkNotNull(yamlConfig.getType());
- Preconditions.checkNotNull(yamlConfig.getParameter());
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
new file mode 100644
index 00000000000..ac0bc618ca7
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfiguration.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobSubType;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Scaling job configuration.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+@Slf4j
+// TODO share for totally new scenario
+// TODO rename to Yaml, add config class
+public final class RuleAlteredJobConfiguration implements PipelineJobConfiguration {
+
+ private String jobId;
+
+ private String databaseName;
+
+ // TODO it should not put in jobConfig since it's mutable
+ private Integer jobShardingItem;
+
+ /**
+ * Map{altered rule yaml class name, re-shard needed table names}.
+ */
+ private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
+
+ private Integer activeVersion;
+
+ private Integer newVersion;
+
+ private YamlPipelineDataSourceConfiguration source;
+
+ private YamlPipelineDataSourceConfiguration target;
+
+ private int concurrency = 3;
+
+ private int retryTimes = 3;
+
+ /**
+ * Collection of each logic table's first data node.
+ * <p>
+ * If <pre>actualDataNodes: ds_${0..1}.t_order_${0..1}</pre> and <pre>actualDataNodes: ds_${0..1}.t_order_item_${0..1}</pre>,
+ * then value may be: {@code t_order:ds_0.t_order_0|t_order_item:ds_0.t_order_item_0}.
+ * </p>
+ */
+ private String tablesFirstDataNodes;
+
+ private List<String> jobShardingDataNodes;
+
+ private String logicTables;
+
+ // TODO shardingSize should be configurable
+ private int shardingSize = 1000 * 10000;
+
+ private String sourceDatabaseType;
+
+ private String targetDatabaseType;
+
+ /**
+ * Set source.
+ *
+ * @param source source configuration
+ */
+ public void setSource(final YamlPipelineDataSourceConfiguration source) {
+ checkParameters(source);
+ this.source = source;
+ }
+
+ /**
+ * Set target.
+ *
+ * @param target target configuration
+ */
+ public void setTarget(final YamlPipelineDataSourceConfiguration target) {
+ checkParameters(target);
+ this.target = target;
+ }
+
+ private void checkParameters(final YamlPipelineDataSourceConfiguration yamlConfig) {
+ Preconditions.checkNotNull(yamlConfig);
+ Preconditions.checkNotNull(yamlConfig.getType());
+ Preconditions.checkNotNull(yamlConfig.getParameter());
+ }
+
+ /**
+ * Get job sharding count.
+ *
+ * @return job sharding count
+ */
+ public int getJobShardingCount() {
+ return null == jobShardingDataNodes ? 0 : jobShardingDataNodes.size();
+ }
+
+ /**
+ * Split {@linkplain #logicTables} to logic table names.
+ *
+ * @return logic table names
+ */
+ public List<String> splitLogicTableNames() {
+ return Splitter.on(',').splitToList(logicTables);
+ }
+
+ /**
+ * Build handle configuration.
+ */
+ public void buildHandleConfig() {
+ if (null == getJobShardingDataNodes()) {
+ RuleAlteredJobConfigurationPreparerFactory.newInstance().extendJobConfiguration(this);
+ }
+ if (null == jobId) {
+ jobId = generateJobId();
+ }
+ if (Strings.isNullOrEmpty(getSourceDatabaseType())) {
+ PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter());
+ setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getName());
+ }
+ if (Strings.isNullOrEmpty(getTargetDatabaseType())) {
+ PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter());
+ setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getName());
+ }
+ if (null == jobShardingItem) {
+ jobShardingItem = 0;
+ }
+ }
+
+ private String generateJobId() {
+ RuleAlteredJobId jobId = new RuleAlteredJobId();
+ // TODO type, subTypes
+ jobId.setType(JobType.RULE_ALTERED.getValue());
+ jobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+ jobId.setSubTypes(Collections.singletonList(JobSubType.SCALING.getValue()));
+ jobId.setCurrentMetadataVersion(activeVersion);
+ jobId.setNewMetadataVersion(newVersion);
+ jobId.setDatabaseName(databaseName);
+ return jobId.marshal();
+ }
+
+ @Override
+ public String toString() {
+ return "RuleAlteredJobConfiguration{"
+ + "jobId='" + jobId + '\'' + ", databaseName='" + databaseName + '\''
+ + ", activeVersion=" + activeVersion + ", newVersion=" + newVersion + ", shardingSize=" + shardingSize
+ + ", sourceDatabaseType='" + sourceDatabaseType + '\'' + ", targetDatabaseType='" + targetDatabaseType + '\''
+ + '}';
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
index ad8687c4818..e2f6e10b356 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/TaskConfiguration.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
@ToString
public final class TaskConfiguration {
- private final HandleConfiguration handleConfig;
+ private final RuleAlteredJobConfiguration jobConfig;
private final DumperConfiguration dumperConfig;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java
deleted file mode 100644
index 11dc65ad923..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/WorkflowConfiguration.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.api.config.rulealtered;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-import java.util.List;
-import java.util.Map;
-
-@NoArgsConstructor
-@Getter
-@Setter
-@ToString
-public final class WorkflowConfiguration {
-
- private long allowDelayMilliseconds = 60 * 1000L;
-
- private String databaseName;
-
- /**
- * Map{altered rule yaml class name, re-shard needed table names}.
- */
- private Map<String, List<String>> alteredRuleYamlClassNameTablesMap;
-
- private Integer activeVersion;
-
- private Integer newVersion;
-
- public WorkflowConfiguration(final String databaseName, final Map<String, List<String>> alteredRuleYamlClassNameTablesMap, final int activeVersion, final int newVersion) {
- this.databaseName = databaseName;
- this.alteredRuleYamlClassNameTablesMap = alteredRuleYamlClassNameTablesMap;
- this.activeVersion = activeVersion;
- this.newVersion = newVersion;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index 557c86fac46..6d47590087d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -17,10 +17,8 @@
package org.apache.shardingsphere.data.pipeline.spi.rulealtered;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.spi.type.required.RequiredSPI;
@@ -30,21 +28,18 @@ import org.apache.shardingsphere.spi.type.required.RequiredSPI;
public interface RuleAlteredJobConfigurationPreparer extends RequiredSPI {
/**
- * Create handle configuration, used to build job configuration.
+ * Extend job configuration.
*
- * @param pipelineConfig pipeline configuration
- * @param workflowConfig workflow configuration
- * @return handle configuration
+ * @param jobConfig job configuration
*/
- HandleConfiguration createHandleConfiguration(PipelineConfiguration pipelineConfig, WorkflowConfiguration workflowConfig);
+ void extendJobConfiguration(RuleAlteredJobConfiguration jobConfig);
/**
* Create task configuration, used by underlying scheduler.
*
- * @param pipelineConfig pipeline configuration
- * @param handleConfig handle configuration
+ * @param jobConfig job configuration
* @param onRuleAlteredActionConfig action configuration
* @return task configuration
*/
- TaskConfiguration createTaskConfiguration(PipelineConfiguration pipelineConfig, HandleConfiguration handleConfig, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
+ TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration jobConfig, OnRuleAlteredActionConfiguration onRuleAlteredActionConfig);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
similarity index 53%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
index 8b51502ad7a..e35d3436abd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/HandleConfigurationTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/config/rulealtered/RuleAlteredJobConfigurationTest.java
@@ -25,38 +25,24 @@ import java.util.Arrays;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class HandleConfigurationTest {
+public final class RuleAlteredJobConfigurationTest {
@Test
public void assertGetJobShardingCountByNull() {
- assertThat(new HandleConfiguration().getJobShardingCount(), is(0));
+ assertThat(new RuleAlteredJobConfiguration().getJobShardingCount(), is(0));
}
@Test
public void assertGetJobShardingCount() {
- HandleConfiguration handleConfig = new HandleConfiguration();
- handleConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
- assertThat(handleConfig.getJobShardingCount(), is(2));
+ RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+ jobConfig.setJobShardingDataNodes(Arrays.asList("node1", "node2"));
+ assertThat(jobConfig.getJobShardingCount(), is(2));
}
@Test
public void assertSplitLogicTableNames() {
- HandleConfiguration handleConfig = new HandleConfiguration();
- handleConfig.setLogicTables("foo_tbl,bar_tbl");
- assertThat(handleConfig.splitLogicTableNames(), is(Lists.newArrayList("foo_tbl", "bar_tbl")));
- }
-
- @Test
- public void assertGetJobIdDigestByLongName() {
- HandleConfiguration handleConfig = new HandleConfiguration();
- handleConfig.setJobId("abcdefg");
- assertThat(handleConfig.getJobIdDigest(), is("abcdef"));
- }
-
- @Test
- public void assertGetJobIdDigestByShortName() {
- HandleConfiguration handleConfiguration = new HandleConfiguration();
- handleConfiguration.setJobId("abcdef");
- assertThat(handleConfiguration.getJobIdDigest(), is("abcdef"));
+ RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+ jobConfig.setLogicTables("foo_tbl,bar_tbl");
+ assertThat(jobConfig.splitLogicTableNames(), is(Lists.newArrayList("foo_tbl", "bar_tbl")));
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 384ea534d12..61ae24f551f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -61,7 +61,7 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
public void persistJobProgress(final RuleAlteredJobContext jobContext) {
JobProgress jobProgress = new JobProgress();
jobProgress.setStatus(jobContext.getStatus());
- jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getHandleConfig().getSourceDatabaseType());
+ jobProgress.setSourceDatabaseType(jobContext.getJobConfig().getSourceDatabaseType());
jobProgress.setIncrementalTaskProgressMap(getIncrementalTaskProgressMap(jobContext));
jobProgress.setInventoryTaskProgressMap(getInventoryTaskProgressMap(jobContext));
String value = YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(jobProgress));
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 5edfcd16711..aca7e399b82 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,7 @@ import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -91,10 +90,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
private JobInfo getJobInfo(final String jobName) {
JobInfo result = new JobInfo(jobName);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(result.getJobId());
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
result.setActive(!jobConfigPOJO.isDisabled());
- result.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
- result.setTables(jobConfig.getHandleConfig().getLogicTables());
+ result.setShardingTotalCount(jobConfig.getJobShardingCount());
+ result.setTables(jobConfig.getLogicTables());
result.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
result.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
result.setJobParameter(jobConfigPOJO.getJobParameter());
@@ -102,15 +101,15 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
}
@Override
- public Optional<String> start(final JobConfiguration jobConfig) {
+ public Optional<String> start(final RuleAlteredJobConfiguration jobConfig) {
jobConfig.buildHandleConfig();
- if (jobConfig.getHandleConfig().getJobShardingCount() == 0) {
+ if (jobConfig.getJobShardingCount() == 0) {
log.warn("Invalid scaling job config!");
throw new PipelineJobCreationException("handleConfig shardingTotalCount is 0");
}
- log.info("Start scaling job by {}", jobConfig.getHandleConfig());
+ log.info("Start scaling job by {}", jobConfig);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- String jobId = jobConfig.getHandleConfig().getJobId();
+ String jobId = jobConfig.getJobId();
String jobConfigKey = String.format("%s/%s/config", DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
@@ -121,10 +120,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
return Optional.of(jobId);
}
- private String createJobConfig(final JobConfiguration jobConfig) {
+ private String createJobConfig(final RuleAlteredJobConfiguration jobConfig) {
JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
- jobConfigPOJO.setJobName(jobConfig.getHandleConfig().getJobId());
- jobConfigPOJO.setShardingTotalCount(jobConfig.getHandleConfig().getJobShardingCount());
+ jobConfigPOJO.setJobName(jobConfig.getJobId());
+ jobConfigPOJO.setShardingTotalCount(jobConfig.getJobShardingCount());
jobConfigPOJO.setJobParameter(YamlEngine.marshal(jobConfig));
jobConfigPOJO.getProps().setProperty("create_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
return YamlEngine.marshal(jobConfigPOJO);
@@ -137,10 +136,10 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
}
@Override
- public Map<Integer, JobProgress> getProgress(final JobConfiguration jobConfig) {
- String jobId = jobConfig.getHandleConfig().getJobId();
+ public Map<Integer, JobProgress> getProgress(final RuleAlteredJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- return IntStream.range(0, jobConfig.getHandleConfig().getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
+ return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
JobProgress jobProgress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobProgress(jobId, each);
if (null != jobProgress) {
jobProgress.setActive(!jobConfigPOJO.isDisabled());
@@ -149,22 +148,22 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
}, LinkedHashMap::putAll);
}
- private void verifyManualMode(final JobConfiguration jobConfig) {
+ private void verifyManualMode(final RuleAlteredJobConfiguration jobConfig) {
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
throw new PipelineVerifyFailedException("It's not necessary to do it in auto mode.");
}
}
- private void verifyJobNotCompleted(final JobConfiguration jobConfig) {
- if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getHandleConfig().getJobShardingCount(), getProgress(jobConfig).values())) {
+ private void verifyJobNotCompleted(final RuleAlteredJobConfiguration jobConfig) {
+ if (RuleAlteredJobProgressDetector.isJobCompleted(jobConfig.getJobShardingCount(), getProgress(jobConfig).values())) {
throw new PipelineVerifyFailedException("Job is completed, it's not necessary to do it.");
}
}
- private void verifySourceWritingStopped(final JobConfiguration jobConfig) {
+ private void verifySourceWritingStopped(final RuleAlteredJobConfiguration jobConfig) {
LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ String databaseName = jobConfig.getDatabaseName();
ShardingSphereLock lock = lockContext.getGlobalLock(databaseName);
if (null == lock || !lock.isLocked()) {
throw new PipelineVerifyFailedException("Source writing is not stopped. You could run `STOP SCALING SOURCE WRITING {jobId}` to stop it.");
@@ -176,11 +175,11 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("stopClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ String databaseName = jobConfig.getDatabaseName();
stopClusterWriteDB(databaseName, jobId);
}
@@ -204,9 +203,9 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("restoreClusterWriteDB for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
verifyManualMode(jobConfig);
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ String databaseName = jobConfig.getDatabaseName();
restoreClusterWriteDB(databaseName, jobId);
}
@@ -242,12 +241,12 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
@Override
public boolean isDataConsistencyCheckNeeded(final String jobId) {
log.info("isDataConsistencyCheckNeeded for job {}", jobId);
- JobConfiguration jobConfig = getJobConfig(jobId);
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(jobId);
return isDataConsistencyCheckNeeded(jobConfig);
}
@Override
- public boolean isDataConsistencyCheckNeeded(final JobConfiguration jobConfig) {
+ public boolean isDataConsistencyCheckNeeded(final RuleAlteredJobConfiguration jobConfig) {
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
return isDataConsistencyCheckNeeded(ruleAlteredContext);
}
@@ -260,13 +259,13 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
- JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig);
}
@Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig) {
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
if (!isDataConsistencyCheckNeeded(ruleAlteredContext)) {
log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
@@ -279,20 +278,20 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType) {
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
- JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
}
- private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
- String jobId = jobConfig.getHandleConfig().getJobId();
+ private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
+ String jobId = jobConfig.getJobId();
Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
- private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+ private void verifyDataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
verifyManualMode(jobConfig);
verifySourceWritingStopped(jobConfig);
}
@@ -319,7 +318,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
checkModeConfig();
log.info("Switch cluster configuration for job {}", jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- JobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
+ RuleAlteredJobConfiguration jobConfig = getJobConfig(jobConfigPOJO);
verifyManualMode(jobConfig);
verifyJobNotStopped(jobConfigPOJO);
verifyJobNotCompleted(jobConfig);
@@ -327,8 +326,8 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
}
@Override
- public void switchClusterConfiguration(final JobConfiguration jobConfig) {
- String jobId = jobConfig.getHandleConfig().getJobId();
+ public void switchClusterConfiguration(final RuleAlteredJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
if (isDataConsistencyCheckNeeded(ruleAlteredContext)) {
@@ -337,8 +336,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
throw new PipelineVerifyFailedException("Data consistency check is not finished or failed.");
}
}
- WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
- ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(workflowConfig.getDatabaseName(), workflowConfig.getActiveVersion(), workflowConfig.getNewVersion());
+ ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(jobConfig.getDatabaseName(), jobConfig.getActiveVersion(), jobConfig.getNewVersion());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
// TODO rewrite job status update after job progress structure refactor
RuleAlteredJobSchedulerCenter.updateJobStatus(jobId, JobStatus.FINISHED);
@@ -363,11 +361,11 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
}
@Override
- public JobConfiguration getJobConfig(final String jobId) {
+ public RuleAlteredJobConfiguration getJobConfig(final String jobId) {
return getJobConfig(getElasticJobConfigPOJO(jobId));
}
- private JobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
- return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+ private RuleAlteredJobConfiguration getJobConfig(final JobConfigurationPOJO elasticJobConfigPOJO) {
+ return YamlEngine.unmarshal(elasticJobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index d477a5e5133..38370424eda 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -68,13 +68,14 @@ import java.util.concurrent.TimeUnit;
*/
public final class DataConsistencyChecker {
- private final JobConfiguration jobConfig;
+ // TODO remove jobConfig for common usage
+ private final RuleAlteredJobConfiguration jobConfig;
private final Collection<String> logicTableNames;
- public DataConsistencyChecker(final JobConfiguration jobConfig) {
+ public DataConsistencyChecker(final RuleAlteredJobConfiguration jobConfig) {
this.jobConfig = jobConfig;
- logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
+ logicTableNames = jobConfig.splitLogicTableNames();
}
/**
@@ -96,12 +97,10 @@ public final class DataConsistencyChecker {
}
private Map<String, DataConsistencyCountCheckResult> checkCount() {
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobIdDigest() + "-count-check-%d");
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+ PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
+ PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(logicTableNames.size(), 1);
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
@@ -131,6 +130,11 @@ public final class DataConsistencyChecker {
}
}
+ // TODO use digest (crc32, murmurhash)
+ private String getJobIdDigest(final String jobId) {
+ return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+ }
+
private long count(final DataSource dataSource, final String table, final DatabaseType databaseType) {
try (
Connection connection = dataSource.getConnection();
@@ -144,16 +148,16 @@ public final class DataConsistencyChecker {
}
private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getSource());
- PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getTarget());
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobIdDigest() + "-data-check-%d");
+ PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getSource());
+ PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getTarget());
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getDatabaseName());
+ Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getDatabaseName());
logicTableNames.forEach(each -> {
// TODO put to preparer
if (!tableMetaDataMap.containsKey(each)) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 49b0f1de24b..773f6116fbd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
@@ -65,22 +65,23 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
if (deleted || disabled) {
log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), deleted, disabled);
RuleAlteredJobSchedulerCenter.stop(jobConfigPOJO.getJobName());
- JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
+ // TODO refactor: dispatch to different job types
+ RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
if (deleted) {
new RuleAlteredJobPreparer().cleanup(jobConfig);
- } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getHandleConfig().getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
+ } else if (RuleAlteredJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), ruleAlteredJobAPI.getProgress(jobConfig).values())) {
log.info("isJobSuccessful=true");
new RuleAlteredJobPreparer().cleanup(jobConfig);
}
- ScalingReleaseDatabaseLevelLockEvent releaseLockEvent = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getWorkflowConfig().getDatabaseName());
+ ScalingReleaseDatabaseLevelLockEvent releaseLockEvent = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
ShardingSphereEventBus.getInstance().post(releaseLockEvent);
return;
}
switch (event.getType()) {
case ADDED:
case UPDATED:
- JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ String databaseName = jobConfig.getDatabaseName();
if (PipelineSimpleLock.getInstance().tryLock(databaseName, 1000)) {
execute(jobConfigPOJO);
} else {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 8a6006bde9d..b755976eec2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
@@ -55,7 +55,7 @@ public final class FinishedCheckJob implements SimpleJob {
}
try {
// TODO refactor: dispatch to different job types
- JobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), JobConfiguration.class, true);
+ RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(jobInfo.getJobParameter(), RuleAlteredJobConfiguration.class, true);
RuleAlteredContext ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
if (null == ruleAlteredContext.getCompletionDetectAlgorithm()) {
log.info("completionDetector not configured, auto switch will not be enabled. You could query job progress and switch config manually with DistSQL.");
@@ -67,7 +67,7 @@ public final class FinishedCheckJob implements SimpleJob {
}
log.info("scaling job {} almost finished.", jobId);
RowBasedJobLockAlgorithm sourceWritingStopAlgorithm = ruleAlteredContext.getSourceWritingStopAlgorithm();
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ String databaseName = jobConfig.getDatabaseName();
try {
if (null != sourceWritingStopAlgorithm) {
sourceWritingStopAlgorithm.lock(databaseName, jobId + "");
@@ -109,14 +109,14 @@ public final class FinishedCheckJob implements SimpleJob {
return flag;
}
- private boolean dataConsistencyCheck(final JobConfiguration jobConfig) {
- String jobId = jobConfig.getHandleConfig().getJobId();
+ private boolean dataConsistencyCheck(final RuleAlteredJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
log.info("dataConsistencyCheck for job {}", jobId);
return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
}
- private void switchClusterConfiguration(final String databaseName, final JobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
- String jobId = jobConfig.getHandleConfig().getJobId();
+ private void switchClusterConfiguration(final String databaseName, final RuleAlteredJobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
+ String jobId = jobConfig.getJobId();
try {
if (null != checkoutLockAlgorithm) {
checkoutLockAlgorithm.lock(databaseName, jobId + "");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 1f423ff128a..5570b5d6fa3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
@@ -53,12 +53,12 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
private static final String[] IGNORE_EXCEPTION_MESSAGE = {"multiple primary keys for table", "already exists"};
- protected final PipelineDataSourceWrapper getSourceCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
- return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter()));
+ protected final PipelineDataSourceWrapper getSourceCachedDataSource(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+ return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
}
- protected final PipelineDataSourceWrapper getTargetCachedDataSource(final PipelineConfiguration pipelineConfig, final PipelineDataSourceManager dataSourceManager) {
- return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter()));
+ protected final PipelineDataSourceWrapper getTargetCachedDataSource(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+ return dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter()));
}
protected final void executeTargetTableSQL(final Connection targetConnection, final String sql) throws SQLException {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index c24bce303d0..41233157c1a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -35,7 +35,7 @@ public final class PrepareTargetTablesParameter {
private final JobDataNodeLine tablesFirstDataNodes;
@NonNull
- private final PipelineConfiguration pipelineConfiguration;
+ private final RuleAlteredJobConfiguration jobConfig;
@NonNull
private final PipelineDataSourceManager dataSourceManager;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 08c25ce57aa..b437dbfe324 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -42,8 +42,8 @@ public final class RuleAlteredJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
log.info("Execute job {}-{}", shardingContext.getJobName(), shardingContext.getShardingItem());
- JobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), JobConfiguration.class, true);
- jobConfig.getHandleConfig().setJobShardingItem(shardingContext.getShardingItem());
+ RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(shardingContext.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ jobConfig.setJobShardingItem(shardingContext.getShardingItem());
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
jobContext.setInitProgress(governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()));
jobContext.setJobPreparer(jobPreparer);
@@ -56,7 +56,7 @@ public final class RuleAlteredJob implements SimpleJob {
RuleAlteredJobSchedulerCenter.stop(shardingContext.getJobName());
jobContext.setStatus(JobStatus.PREPARING_FAILURE);
governanceRepositoryAPI.persistJobProgress(jobContext);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getWorkflowConfig().getDatabaseName());
+ ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName());
ShardingSphereEventBus.getInstance().post(event);
throw ex;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 32562af86ab..aa916d6a04a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -23,7 +23,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -59,7 +59,7 @@ public final class RuleAlteredJobContext {
private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
- private final JobConfiguration jobConfig;
+ private final RuleAlteredJobConfiguration jobConfig;
private final RuleAlteredContext ruleAlteredContext;
@@ -81,13 +81,13 @@ public final class RuleAlteredJobContext {
private RuleAlteredJobPreparer jobPreparer;
- public RuleAlteredJobContext(final JobConfiguration jobConfig) {
+ public RuleAlteredJobContext(final RuleAlteredJobConfiguration jobConfig) {
ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
this.jobConfig = jobConfig;
jobConfig.buildHandleConfig();
- jobId = jobConfig.getHandleConfig().getJobId();
- shardingItem = jobConfig.getHandleConfig().getJobShardingItem();
- taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig.getPipelineConfig(), jobConfig.getHandleConfig(), ruleAlteredContext.getOnRuleAlteredActionConfig());
+ jobId = jobConfig.getJobId();
+ shardingItem = jobConfig.getJobShardingItem();
+ taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, ruleAlteredContext.getOnRuleAlteredActionConfig());
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 0138c104663..5f97ac40df7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -94,9 +94,9 @@ public final class RuleAlteredJobPreparer {
}
private void prepareAndCheckTargetWithLock(final RuleAlteredJobContext jobContext) {
- JobConfiguration jobConfig = jobContext.getJobConfig();
+ RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
// TODO the lock will be replaced
- String lockName = "prepare-" + jobConfig.getHandleConfig().getJobId();
+ String lockName = "prepare-" + jobConfig.getJobId();
ShardingSphereLock lock = PipelineContext.getContextManager().getInstanceContext().getLockContext().getOrCreateGlobalLock(lockName);
if (lock.tryLock(lockName, 1)) {
try {
@@ -128,19 +128,19 @@ public final class RuleAlteredJobPreparer {
}
}
- private void prepareTarget(final JobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
- Optional<DataSourcePreparer> dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getHandleConfig().getTargetDatabaseType());
+ private void prepareTarget(final RuleAlteredJobConfiguration jobConfig, final PipelineDataSourceManager dataSourceManager) {
+ Optional<DataSourcePreparer> dataSourcePreparer = EnvironmentCheckerFactory.getDataSourcePreparer(jobConfig.getTargetDatabaseType());
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
- JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getHandleConfig().getTablesFirstDataNodes());
- PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig.getPipelineConfig(), dataSourceManager);
+ JobDataNodeLine tablesFirstDataNodes = JobDataNodeLine.unmarshal(jobConfig.getTablesFirstDataNodes());
+ PrepareTargetTablesParameter prepareTargetTablesParameter = new PrepareTargetTablesParameter(tablesFirstDataNodes, jobConfig, dataSourceManager);
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
}
private void checkSourceDataSource(final RuleAlteredJobContext jobContext) {
- DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getHandleConfig().getSourceDatabaseType());
+ DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getSourceDatabaseType());
Collection<PipelineDataSourceWrapper> sourceDataSources = Collections.singleton(jobContext.getSourceDataSource());
dataSourceChecker.checkConnection(sourceDataSources);
dataSourceChecker.checkPrivilege(sourceDataSources);
@@ -148,7 +148,7 @@ public final class RuleAlteredJobPreparer {
}
private void checkTargetDataSource(final RuleAlteredJobContext jobContext, final PipelineDataSourceWrapper targetDataSource) {
- DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getHandleConfig().getTargetDatabaseType());
+ DataSourceChecker dataSourceChecker = TypedSPIRegistry.getRegisteredService(DataSourceChecker.class, jobContext.getJobConfig().getTargetDatabaseType());
Collection<PipelineDataSourceWrapper> targetDataSources = Collections.singletonList(targetDataSource);
dataSourceChecker.checkConnection(targetDataSources);
dataSourceChecker.checkTargetTable(targetDataSources, jobContext.getTaskConfig().getImporterConfig().getShardingColumnsMap().keySet());
@@ -166,7 +166,7 @@ public final class RuleAlteredJobPreparer {
PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
- IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getHandleConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
+ IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(), taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
pipelineChannelFactory, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
jobContext.getIncrementalTasks().add(incrementalTask);
}
@@ -179,7 +179,7 @@ public final class RuleAlteredJobPreparer {
return positionOptional.get();
}
}
- String databaseType = taskConfig.getHandleConfig().getSourceDatabaseType();
+ String databaseType = taskConfig.getJobConfig().getSourceDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig());
return PositionInitializerFactory.getPositionInitializer(databaseType).init(dataSource);
}
@@ -189,7 +189,7 @@ public final class RuleAlteredJobPreparer {
*
* @param jobConfig job configuration
*/
- public void cleanup(final JobConfiguration jobConfig) {
+ public void cleanup(final RuleAlteredJobConfiguration jobConfig) {
try {
cleanup0(jobConfig);
} catch (final SQLException ex) {
@@ -197,11 +197,11 @@ public final class RuleAlteredJobPreparer {
}
}
- private void cleanup0(final JobConfiguration jobConfig) throws SQLException {
- DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getHandleConfig().getSourceDatabaseType());
+ private void cleanup0(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
+ DatabaseType databaseType = DatabaseTypeRegistry.getActualDatabaseType(jobConfig.getSourceDatabaseType());
PositionInitializer positionInitializer = PositionInitializerFactory.getPositionInitializer(databaseType.getName());
ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory
- .newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+ .newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
for (DataSourceProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceDataSourceConfig.getRootConfig()).values()) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index b6718c37502..4043474cafc 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -104,7 +104,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
log.error("Inventory task execute failed.", throwable);
stop();
jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getWorkflowConfig().getDatabaseName());
+ ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
ShardingSphereEventBus.getInstance().post(event);
}
};
@@ -138,7 +138,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
log.error("Incremental task execute failed.", throwable);
stop();
jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getWorkflowConfig().getDatabaseName());
+ ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
ShardingSphereEventBus.getInstance().post(event);
}
};
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index c2cf39f75e2..89ea2b67447 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -22,11 +22,8 @@ import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.HandleConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -122,11 +119,11 @@ public final class RuleAlteredJobWorker {
* @param jobConfig job configuration
* @return rule altered context
*/
- public static RuleAlteredContext createRuleAlteredContext(final JobConfiguration jobConfig) {
+ public static RuleAlteredContext createRuleAlteredContext(final RuleAlteredJobConfiguration jobConfig) {
YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
YamlRuleConfiguration yamlRuleConfig = null;
for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
- if (jobConfig.getWorkflowConfig().getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
+ if (jobConfig.getAlteredRuleYamlClassNameTablesMap().containsKey(each.getClass().getName())) {
yamlRuleConfig = each;
break;
}
@@ -151,14 +148,12 @@ public final class RuleAlteredJobWorker {
* @param jobConfig job configuration
* @return YAML root configuration
*/
- private static YamlRootConfiguration getYamlRootConfig(final JobConfiguration jobConfig) {
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+ private static YamlRootConfiguration getYamlRootConfig(final RuleAlteredJobConfiguration jobConfig) {
+ PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
if (targetDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
return ((ShardingSpherePipelineDataSourceConfiguration) targetDataSourceConfig).getRootConfig();
}
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
+ PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
return ((ShardingSpherePipelineDataSourceConfiguration) sourceDataSourceConfig).getRootConfig();
}
@@ -174,7 +169,7 @@ public final class RuleAlteredJobWorker {
log.warn("There is uncompleted job with the same database name, please handle it first, current job will be ignored");
return;
}
- Optional<JobConfiguration> jobConfigOptional = createJobConfig(event);
+ Optional<RuleAlteredJobConfiguration> jobConfigOptional = createJobConfig(event);
if (jobConfigOptional.isPresent()) {
PipelineJobAPIFactory.newInstance().start(jobConfigOptional.get());
} else {
@@ -185,7 +180,7 @@ public final class RuleAlteredJobWorker {
}
}
- private Optional<JobConfiguration> createJobConfig(final StartScalingEvent event) {
+ private Optional<RuleAlteredJobConfiguration> createJobConfig(final StartScalingEvent event) {
YamlRootConfiguration sourceRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getSourceDataSource(), event.getSourceRule());
YamlRootConfiguration targetRootConfig = getYamlRootConfiguration(event.getDatabaseName(), event.getTargetDataSource(), event.getTargetRule());
Map<String, List<String>> alteredRuleYamlClassNameTablesMap = new HashMap<>();
@@ -209,9 +204,14 @@ public final class RuleAlteredJobWorker {
log.error("more than 1 rule altered");
throw new PipelineJobCreationException("more than 1 rule altered");
}
- WorkflowConfiguration workflowConfig = new WorkflowConfiguration(event.getDatabaseName(), alteredRuleYamlClassNameTablesMap, event.getActiveVersion(), event.getNewVersion());
- PipelineConfiguration pipelineConfig = getPipelineConfiguration(sourceRootConfig, targetRootConfig);
- return Optional.of(new JobConfiguration(workflowConfig, pipelineConfig));
+ RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+ result.setDatabaseName(event.getDatabaseName());
+ result.setAlteredRuleYamlClassNameTablesMap(alteredRuleYamlClassNameTablesMap);
+ result.setActiveVersion(event.getActiveVersion());
+ result.setNewVersion(event.getNewVersion());
+ result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
+ result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
+ return Optional.of(result);
}
private Collection<Pair<YamlRuleConfiguration, YamlRuleConfiguration>> groupSourceTargetRuleConfigsByType(final Collection<YamlRuleConfiguration> sourceRules,
@@ -231,13 +231,6 @@ public final class RuleAlteredJobWorker {
return result;
}
- private PipelineConfiguration getPipelineConfiguration(final YamlRootConfiguration sourceRootConfig, final YamlRootConfiguration targetRootConfig) {
- PipelineConfiguration result = new PipelineConfiguration();
- result.setSource(createYamlPipelineDataSourceConfiguration(sourceRootConfig));
- result.setTarget(createYamlPipelineDataSourceConfiguration(targetRootConfig));
- return result;
- }
-
private YamlPipelineDataSourceConfiguration createYamlPipelineDataSourceConfiguration(final YamlRootConfiguration yamlConfig) {
PipelineDataSourceConfiguration config = new ShardingSpherePipelineDataSourceConfiguration(yamlConfig);
YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration();
@@ -280,14 +273,12 @@ public final class RuleAlteredJobWorker {
/**
* Build task configuration.
*
- * @param pipelineConfig pipeline configuration
- * @param handleConfig handle configuration
+ * @param jobConfig job configuration
* @param onRuleAlteredActionConfig action configuration
* @return task configuration
*/
- public static TaskConfiguration buildTaskConfig(final PipelineConfiguration pipelineConfig,
- final HandleConfiguration handleConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
- return RuleAlteredJobConfigurationPreparerFactory.newInstance().createTaskConfiguration(pipelineConfig, handleConfig, onRuleAlteredActionConfig);
+ public static TaskConfiguration buildTaskConfig(final RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+ return RuleAlteredJobConfigurationPreparerFactory.newInstance().createTaskConfiguration(jobConfig, onRuleAlteredActionConfig);
}
private boolean hasUncompletedJobOfSameDatabaseName(final String databaseName) {
@@ -297,8 +288,8 @@ public final class RuleAlteredJobWorker {
.allMatch(progress -> null != progress && progress.getStatus().equals(JobStatus.FINISHED))) {
continue;
}
- JobConfiguration jobConfiguration = YamlEngine.unmarshal(each.getJobParameter(), JobConfiguration.class, true);
- if (hasUncompletedJobOfSameDatabaseName(jobConfiguration, each.getJobId(), databaseName)) {
+ RuleAlteredJobConfiguration jobConfig = YamlEngine.unmarshal(each.getJobParameter(), RuleAlteredJobConfiguration.class, true);
+ if (hasUncompletedJobOfSameDatabaseName(jobConfig, each.getJobId(), databaseName)) {
result = true;
break;
}
@@ -306,14 +297,8 @@ public final class RuleAlteredJobWorker {
return !result;
}
- private boolean hasUncompletedJobOfSameDatabaseName(final JobConfiguration jobConfig, final String jobId, final String currentDatabaseName) {
- HandleConfiguration handleConfig = jobConfig.getHandleConfig();
- WorkflowConfiguration workflowConfig;
- if (null == handleConfig || null == (workflowConfig = jobConfig.getWorkflowConfig())) {
- log.warn("handleConfig or workflowConfig null, jobId={}", jobId);
- return false;
- }
- return currentDatabaseName.equals(workflowConfig.getDatabaseName());
+ private boolean hasUncompletedJobOfSameDatabaseName(final RuleAlteredJobConfiguration jobConfig, final String jobId, final String currentDatabaseName) {
+ return currentDatabaseName.equals(jobConfig.getDatabaseName());
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 851749b0189..cc8c76f4707 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -177,8 +177,8 @@ public final class InventoryTaskSplitter {
private Collection<IngestPosition<?>> getPositionByPrimaryKeyRange(final RuleAlteredJobContext jobContext, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new ArrayList<>();
- JobConfiguration jobConfig = jobContext.getJobConfig();
- String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getSourceDatabaseType())
+ RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
+ String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
try (
Connection connection = dataSource.getConnection();
@@ -186,7 +186,7 @@ public final class InventoryTaskSplitter {
long beginId = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ps.setLong(1, beginId);
- ps.setLong(2, jobConfig.getHandleConfig().getShardingSize());
+ ps.setLong(2, jobConfig.getShardingSize());
try (ResultSet rs = ps.executeQuery()) {
if (!rs.next()) {
log.info("getPositionByPrimaryKeyRange, rs.next false, break");
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index f01419f6db0..dccd3b0c979 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.environment;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -43,15 +43,15 @@ public final class ScalingEnvironmentManager {
* @throws SQLException SQL exception
*/
// TODO seems it should be removed, dangerous to use
- public void cleanupTargetTables(final JobConfiguration jobConfig) throws SQLException {
- Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
+ public void cleanupTargetTables(final RuleAlteredJobConfiguration jobConfig) throws SQLException {
+ Collection<String> tables = jobConfig.splitLogicTableNames();
log.info("cleanupTargetTables, tables={}", tables);
- YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
+ YamlPipelineDataSourceConfiguration target = jobConfig.getTarget();
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
- String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+ String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getTargetDatabaseType()).buildTruncateSQL(each);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 0657f5fb720..32dced0adf4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
@@ -44,11 +44,11 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
- PipelineConfiguration pipelineConfig = parameter.getPipelineConfiguration();
+ RuleAlteredJobConfiguration jobConfig = parameter.getJobConfig();
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
try (
- Connection sourceConnection = getSourceCachedDataSource(pipelineConfig, dataSourceManager).getConnection();
- Connection targetConnection = getTargetCachedDataSource(pipelineConfig, dataSourceManager).getConnection()) {
+ Connection sourceConnection = getSourceCachedDataSource(jobConfig, dataSourceManager).getConnection();
+ Connection targetConnection = getTargetCachedDataSource(jobConfig, dataSourceManager).getConnection()) {
Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
for (String each : logicTableNames) {
String createTableSQL = getCreateTableSQL(sourceConnection, each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index ccf739ffa5e..9ae4828f30e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -50,7 +50,7 @@ public final class MySQLDataSourcePreparerTest {
private PrepareTargetTablesParameter prepareTargetTablesParameter;
@Mock
- private PipelineConfiguration pipelineConfig;
+ private RuleAlteredJobConfiguration jobConfig;
@Mock
private YamlPipelineDataSourceConfiguration sourceYamlPipelineDataSourceConfiguration;
@@ -76,13 +76,13 @@ public final class MySQLDataSourcePreparerTest {
when(mockPipelineDataSourceManager.getDataSource(same(sourceScalingDataSourceConfig))).thenReturn(sourceDataSourceWrapper);
when(mockPipelineDataSourceManager.getDataSource(same(targetScalingDataSourceConfig))).thenReturn(targetDataSourceWrapper);
when(prepareTargetTablesParameter.getDataSourceManager()).thenReturn(mockPipelineDataSourceManager);
- when(pipelineConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfiguration);
- when(pipelineConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
- when(pipelineConfig.getSource().getParameter()).thenReturn("source");
- when(pipelineConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfiguration);
- when(pipelineConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
- when(pipelineConfig.getTarget().getParameter()).thenReturn("target");
- when(prepareTargetTablesParameter.getPipelineConfiguration()).thenReturn(pipelineConfig);
+ when(jobConfig.getSource()).thenReturn(sourceYamlPipelineDataSourceConfiguration);
+ when(jobConfig.getSource().getType()).thenReturn("ShardingSphereJDBC");
+ when(jobConfig.getSource().getParameter()).thenReturn("source");
+ when(jobConfig.getTarget()).thenReturn(targetYamlPipelineDataSourceConfiguration);
+ when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
+ when(jobConfig.getTarget().getParameter()).thenReturn("target");
+ when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new JobDataNodeLine(Collections.emptyList()));
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index c0690d3170d..0dcdf686db0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -60,7 +60,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
}
Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
- try (Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(), parameter.getDataSourceManager()).getConnection()) {
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getJobConfig(), parameter.getDataSourceManager()).getConnection()) {
for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
for (String each : entry.getValue()) {
executeTargetTableSQL(targetConnection, each);
@@ -75,7 +75,7 @@ public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePrepare
private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
Collection<ActualTableDefinition> result = new ArrayList<>();
ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(
- parameter.getPipelineConfiguration().getSource().getType(), parameter.getPipelineConfiguration().getSource().getParameter());
+ parameter.getJobConfig().getSource().getType(), parameter.getJobConfig().getSource().getParameter());
// TODO reuse PipelineDataSourceManager
try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8c55316f395..638cb4e3c00 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -83,10 +83,8 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
}
Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
-
try (
- Connection targetConnection = getTargetCachedDataSource(parameter.getPipelineConfiguration(),
- parameter.getDataSourceManager()).getConnection()) {
+ Connection targetConnection = getTargetCachedDataSource(parameter.getJobConfig(), parameter.getDataSourceManager()).getConnection()) {
for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
for (String each : entry.getValue()) {
executeTargetTableSQL(targetConnection, each);
@@ -124,8 +122,8 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
Collection<ActualTableDefinition> result = new LinkedList<>();
ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig =
- (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(parameter.getPipelineConfiguration().getSource().getType(),
- parameter.getPipelineConfiguration().getSource().getParameter());
+ (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(parameter.getJobConfig().getSource().getType(),
+ parameter.getJobConfig().getSource().getParameter());
try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
DataNode dataNode = each.getDataNodes().get(0);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
index 0fa33438045..30a8a7ff919 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/env/ITEnvironmentContext.java
@@ -20,8 +20,7 @@ package org.apache.shardingsphere.integration.data.pipeline.env;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -82,11 +81,9 @@ public final class ITEnvironmentContext {
}
private static String createScalingConfiguration(final Map<String, YamlTableRuleConfiguration> tableRules) {
- PipelineConfiguration pipelineConfig = new PipelineConfiguration();
- pipelineConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
- pipelineConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setPipelineConfig(pipelineConfig);
+ RuleAlteredJobConfiguration jobConfig = new RuleAlteredJobConfiguration();
+ jobConfig.setSource(createYamlPipelineDataSourceConfiguration(SourceConfiguration.getDockerConfiguration(tableRules)));
+ jobConfig.setTarget(createYamlPipelineDataSourceConfiguration(TargetConfiguration.getDockerConfiguration()));
return new Gson().toJson(jobConfig);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 997b352a08f..e2e14c205c9 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -24,8 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -130,12 +129,12 @@ public final class RuleAlteredJobAPIImplTest {
public void assertDataConsistencyCheck() {
Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
- if (null == jobConfig.getPipelineConfig()) {
- log.error("pipelineConfig is null, jobConfig={}", YamlEngine.marshal(jobConfig));
+ RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+ if (null == jobConfig.getSource()) {
+ log.error("source is null, jobConfig={}", YamlEngine.marshal(jobConfig));
}
- initTableData(jobConfig.getPipelineConfig());
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ initTableData(jobConfig);
+ String databaseName = jobConfig.getDatabaseName();
ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get());
ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
@@ -146,9 +145,9 @@ public final class RuleAlteredJobAPIImplTest {
public void assertDataConsistencyCheckWithAlgorithm() {
Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
- initTableData(jobConfig.getPipelineConfig());
- String databaseName = jobConfig.getWorkflowConfig().getDatabaseName();
+ RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+ initTableData(jobConfig);
+ String databaseName = jobConfig.getDatabaseName();
ruleAlteredJobAPI.stopClusterWriteDB(databaseName, jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE");
ruleAlteredJobAPI.restoreClusterWriteDB(databaseName, jobId.get());
@@ -197,11 +196,11 @@ public final class RuleAlteredJobAPIImplTest {
@Test(expected = PipelineVerifyFailedException.class)
public void assertSwitchClusterConfigurationAlreadyFinished() {
- final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
- Optional<String> jobId = ruleAlteredJobAPI.start(jobConfiguration);
+ final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+ RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
jobContext.setInitProgress(new JobProgress());
repositoryAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
@@ -211,12 +210,12 @@ public final class RuleAlteredJobAPIImplTest {
@Test
public void assertSwitchClusterConfigurationSucceed() {
- final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
- jobConfiguration.getHandleConfig().setJobShardingItem(0);
- Optional<String> jobId = ruleAlteredJobAPI.start(jobConfiguration);
+ final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ jobConfig.setJobShardingItem(0);
+ Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+ RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
jobContext.setInitProgress(new JobProgress());
repositoryAPI.persistJobProgress(jobContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
@@ -232,8 +231,8 @@ public final class RuleAlteredJobAPIImplTest {
public void assertResetTargetTable() {
Optional<String> jobId = ruleAlteredJobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- JobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
- initTableData(jobConfig.getPipelineConfig());
+ RuleAlteredJobConfiguration jobConfig = ruleAlteredJobAPI.getJobConfig(jobId.get());
+ initTableData(jobConfig);
ruleAlteredJobAPI.stop(jobId.get());
ruleAlteredJobAPI.reset(jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
@@ -241,10 +240,10 @@ public final class RuleAlteredJobAPIImplTest {
}
@SneakyThrows(SQLException.class)
- private void initTableData(final PipelineConfiguration pipelineConfig) {
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getSource().getType(), pipelineConfig.getSource().getParameter());
+ private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
+ PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
initTableData(PipelineDataSourceCreatorFactory.getInstance(sourceDataSourceConfig.getType()).createPipelineDataSource(sourceDataSourceConfig.getDataSourceConfiguration()));
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(pipelineConfig.getTarget().getType(), pipelineConfig.getTarget().getParameter());
+ PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
initTableData(PipelineDataSourceCreatorFactory.getInstance(targetDataSourceConfig.getType()).createPipelineDataSource(targetDataSourceConfig.getDataSourceConfiguration()));
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index 9ba77b75ddc..2522239fc4f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -52,7 +52,7 @@ public final class DataConsistencyCheckerTest {
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
}
- private JobConfiguration createJobConfiguration() throws SQLException {
+ private RuleAlteredJobConfiguration createJobConfiguration() throws SQLException {
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
index d63686a922e..720de47b899 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
public final class PipelineDataSourceManagerTest {
- private JobConfiguration jobConfig;
+ private RuleAlteredJobConfiguration jobConfig;
@Before
public void setUp() {
@@ -47,7 +47,7 @@ public final class PipelineDataSourceManagerTest {
public void assertGetDataSource() {
PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();
DataSource actual = dataSourceManager.getDataSource(
- PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter()));
+ PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
}
@@ -55,9 +55,9 @@ public final class PipelineDataSourceManagerTest {
public void assertClose() throws NoSuchFieldException, IllegalAccessException {
try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
dataSourceManager.getDataSource(
- PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter()));
+ PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter()));
dataSourceManager.getDataSource(
- PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter()));
+ PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter()));
Map<?, ?> cachedDataSources = ReflectionUtil.getFieldValue(dataSourceManager, "cachedDataSources", Map.class);
assertNotNull(cachedDataSources);
assertThat(cachedDataSources.size(), is(2));
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index bc4a0bced79..484c998ec10 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -20,9 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import com.google.common.collect.ImmutableMap;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -46,17 +44,19 @@ public final class JobConfigurationBuilder {
*
* @return created job configuration
*/
- public static JobConfiguration createJobConfiguration() {
- JobConfiguration result = new JobConfiguration();
- result.setWorkflowConfig(new WorkflowConfiguration("logic_db", ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")), 0, 1));
- PipelineConfiguration pipelineConfig = new PipelineConfiguration();
- pipelineConfig.setSource(createYamlPipelineDataSourceConfiguration(
+ public static RuleAlteredJobConfiguration createJobConfiguration() {
+ RuleAlteredJobConfiguration result = new RuleAlteredJobConfiguration();
+ result.setDatabaseName("logic_db");
+ result.setAlteredRuleYamlClassNameTablesMap(ImmutableMap.of(YamlShardingRuleConfiguration.class.getName(), Collections.singletonList("t_order")));
+ result.setActiveVersion(0);
+ result.setNewVersion(1);
+ // TODO add autoTables in config file
+ result.setSource(createYamlPipelineDataSourceConfiguration(
new ShardingSpherePipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"))));
- pipelineConfig.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
- result.setPipelineConfig(pipelineConfig);
+ result.setTarget(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("config_standard_jdbc_target.yaml"))));
result.buildHandleConfig();
int activeVersion = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 10) + 1;
- result.getHandleConfig().setJobId(generateJobId(activeVersion, "logic_db"));
+ result.setJobId(generateJobId(activeVersion, "logic_db"));
return result;
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 5e2b7845a8f..1600b0127f1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
@@ -50,7 +50,7 @@ public final class RuleAlteredJobTest {
@SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
public void assertExecute() {
- JobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
ShardingContext shardingContext = new ShardingContext("1", null, 2, YamlEngine.marshal(jobConfig), 0, null);
new RuleAlteredJob().execute(shardingContext);
@@ -60,8 +60,8 @@ public final class RuleAlteredJobTest {
}
@SneakyThrows(SQLException.class)
- private void initTableData(final JobConfiguration jobConfig) {
- YamlPipelineDataSourceConfiguration source = jobConfig.getPipelineConfig().getSource();
+ private void initTableData(final RuleAlteredJobConfiguration jobConfig) {
+ YamlPipelineDataSourceConfiguration source = jobConfig.getSource();
try (
PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
Connection connection = dataSource.getConnection();
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 5e8ad07de13..8c1736ec663 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -17,10 +17,8 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -41,6 +39,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
+import java.util.Collections;
import java.util.Optional;
import static org.junit.Assert.assertFalse;
@@ -56,8 +55,8 @@ public final class RuleAlteredJobWorkerTest {
@Test(expected = PipelineJobCreationException.class)
public void assertCreateRuleAlteredContextNoAlteredRule() {
- JobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
- jobConfig.setWorkflowConfig(new WorkflowConfiguration("logic_db", ImmutableMap.of(), 0, 1));
+ RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ jobConfig.setAlteredRuleYamlClassNameTablesMap(Collections.emptyMap());
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
}
@@ -90,8 +89,8 @@ public final class RuleAlteredJobWorkerTest {
// TODO improve assertHasUncompletedJob, refactor hasUncompletedJobOfSameDatabaseName for easier unit test
// @Test
public void assertHasUncompletedJob() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
- final JobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfiguration);
+ final RuleAlteredJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ RuleAlteredJobContext jobContext = new RuleAlteredJobContext(jobConfig);
jobContext.setStatus(JobStatus.PREPARING);
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
repositoryAPI.persistJobProgress(jobContext);
@@ -99,7 +98,7 @@ public final class RuleAlteredJobWorkerTest {
assertNotNull(jobConfigUrl);
repositoryAPI.persist(PipelineMetaDataNode.getJobConfigPath(jobContext.getJobId()), FileUtils.readFileToString(new File(jobConfigUrl.getFile())));
Object result = ReflectionUtil.invokeMethod(new RuleAlteredJobWorker(), "hasUncompletedJobOfSameDatabaseName", new Class[]{String.class},
- new String[]{jobConfiguration.getWorkflowConfig().getDatabaseName()});
+ new String[]{jobConfig.getDatabaseName()});
assertFalse((Boolean) result);
}
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index cd63cb74f82..5d60ea5c0f0 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -68,7 +68,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithEmptyTable() throws SQLException {
- taskConfig.getHandleConfig().setShardingSize(10);
+ taskConfig.getJobConfig().setShardingSize(10);
initEmptyTablePrimaryEnvironment(taskConfig.getDumperConfig());
List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);
@@ -79,7 +79,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
- taskConfig.getHandleConfig().setShardingSize(10);
+ taskConfig.getJobConfig().setShardingSize(10);
initIntPrimaryEnvironment(taskConfig.getDumperConfig());
List<InventoryTask> actual = inventoryTaskSplitter.splitInventoryData(jobContext);
assertNotNull(actual);