You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/01/28 08:25:52 UTC
[shardingsphere] branch master updated: Rename ScalingConfiguration
and JobConfiguration (#9202)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f33436e Rename ScalingConfiguration and JobConfiguration (#9202)
f33436e is described below
commit f33436e1638ab2015b17a452c57c46ba5da61b5c
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 28 16:25:23 2021 +0800
Rename ScalingConfiguration and JobConfiguration (#9202)
* Rename JobConfiguration to HandleConfiguration
* Rename ScalingConfiguration to JobConfiguration
* Rename ScalingConfiguration to JobConfiguration
Co-authored-by: qiulu3 <Lucas209910>
---
.../util/ServerConfigurationInitializer.java | 6 +-
.../scaling/web/HttpServerHandler.java | 4 +-
.../scaling/web/HttpServerHandlerTest.java | 6 +-
...he.shardingsphere.scaling.core.spi.ScalingEntry | 18 -----
.../src/test/resources/config.json | 40 -----------
.../config_sharding_sphere_jdbc_target.json | 38 ----------
.../src/test/resources/logback-test.xml | 3 +-
.../core/api/RegistryRepositoryAPIImpl.java | 2 +-
...Configuration.java => HandleConfiguration.java} | 22 ++++--
.../scaling/core/config/JobConfiguration.java | 20 +-----
.../scaling/core/config/TaskConfiguration.java | 2 +-
.../scaling/core/job/ScalingJob.java | 14 ++--
.../job/check/AbstractDataConsistencyChecker.java | 4 +-
.../environmental/ScalingEnvironmentalManager.java | 2 +-
.../core/job/preparer/ScalingJobPreparer.java | 8 +--
.../preparer/splitter/InventoryTaskSplitter.java | 4 +-
.../core/service/AbstractScalingJobService.java | 16 ++---
.../scaling/core/service/ScalingJobService.java | 6 +-
.../service/impl/DistributedScalingJobService.java | 34 ++++-----
.../service/impl/StandaloneScalingJobService.java | 6 +-
...gurationUtil.java => JobConfigurationUtil.java} | 34 ++++-----
.../scaling/core/utils/ScalingTaskUtil.java | 8 +--
.../scaling/core/utils/TaskConfigurationUtil.java | 60 ++++++++--------
.../core/datasource/DataSourceManagerTest.java | 4 +-
.../check/AbstractDataConsistencyCheckerTest.java | 4 +-
.../splitter/InventoryTaskSplitterTest.java | 12 ++--
.../core/job/task/inventory/InventoryTaskTest.java | 4 +-
.../impl/DistributedScalingJobServiceTest.java | 28 ++++----
.../impl/StandaloneScalingJobServiceTest.java | 16 ++---
.../scaling/core/util/JobConfigurationUtil.java} | 28 +++++---
.../core/util/ScalingConfigurationUtil.java | 81 ----------------------
.../src/test/resources/config.json | 4 +-
.../config_sharding_sphere_jdbc_target.json | 4 +-
.../elasticjob/ElasticJobScalingWorker.java | 58 ++++++++--------
.../scaling/elasticjob/job/FinishedCheckJob.java | 4 +-
.../scaling/elasticjob/job/ScalingElasticJob.java | 16 ++---
36 files changed, 227 insertions(+), 393 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializer.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializer.java
index cb4b899..08353ad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializer.java
@@ -32,7 +32,7 @@ import java.io.File;
import java.io.IOException;
/**
- * Scaling configuration initializer.
+ * Scaling server configuration initializer.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
@@ -41,11 +41,11 @@ public final class ServerConfigurationInitializer {
private static final String SERVER_FILE = "conf/server.yaml";
/**
- * Init scaling configuration.
+ * Init server configuration.
*/
@SneakyThrows(IOException.class)
public static void init() {
- log.info("Initialize scaling configuration.");
+ log.info("Initialize server configuration.");
File yamlFile = new File(Resources.getResource(SERVER_FILE).getPath());
YamlServerConfiguration serverConfig = YamlEngine.unmarshal(yamlFile, YamlServerConfiguration.class);
Preconditions.checkNotNull(serverConfig, "Server configuration file `%s` is invalid.", yamlFile.getName());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 323b038..fd55565 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -34,7 +34,7 @@ import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -89,7 +89,7 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
}
private void startJob(final ChannelHandlerContext context, final String requestBody) {
- Optional<ScalingJob> scalingJob = scalingJobService.start(GSON.fromJson(requestBody, ScalingConfiguration.class));
+ Optional<ScalingJob> scalingJob = scalingJobService.start(GSON.fromJson(requestBody, JobConfiguration.class));
if (scalingJob.isPresent()) {
response(ResponseContentUtil.build(scalingJob.get()), context, HttpResponseStatus.OK);
return;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 1d091e47..009b3df 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -30,7 +30,7 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
@@ -80,14 +80,14 @@ public final class HttpServerHandlerTest {
@Test
public void assertStartJobSuccess() {
- when(scalingJobService.start(any(ScalingConfiguration.class))).thenReturn(Optional.of(new ScalingJob()));
+ when(scalingJobService.start(any(JobConfiguration.class))).thenReturn(Optional.of(new ScalingJob()));
ResponseContent<?> responseContent = execute("/scaling/job/start");
assertTrue(responseContent.isSuccess());
}
@Test
public void assertStartJobFailure() {
- when(scalingJobService.start(any(ScalingConfiguration.class))).thenReturn(Optional.empty());
+ when(scalingJobService.start(any(JobConfiguration.class))).thenReturn(Optional.empty());
ResponseContent<?> responseContent = execute("/scaling/job/start");
assertFalse(responseContent.isSuccess());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry
deleted file mode 100644
index be286bf..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingEntry
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.fixture.FixtureH2ScalingEntry
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config.json b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config.json
deleted file mode 100644
index 28fcf97..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config.json
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
- "ruleConfiguration": {
- "source": {
- "type": "shardingSphereJdbc",
- "parameter": {
- "dataSource": "dataSources:\n ds_0:\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n props:\n jdbcUrl: jdbc:h2:mem:test_db_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL\n username: root\n password: 'password'\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 50\n minPoolSize: 1\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
- "rule": "rules:\n- !SHARDING\n defaultDatabaseStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: user_id\n tables:\n t1:\n actualDataNodes: ds_0.t1\n keyGenerateStrategy:\n column: order_id\n logicTable: t1\n tableStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: order_id\n t2:\n actualDataNodes: ds_0.t2\n keyGenerateStrategy:\n column: order_item_i [...]
- }
- },
- "target": {
- "type": "jdbc",
- "parameter": {
- "name": "dt_0",
- "jdbcUrl": "jdbc:h2:mem:test_db_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
- "username": "root",
- "password": "password"
- }
- }
- },
- "jobConfiguration": {
- "concurrency": 3
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config_sharding_sphere_jdbc_target.json b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config_sharding_sphere_jdbc_target.json
deleted file mode 100644
index df589d9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/config_sharding_sphere_jdbc_target.json
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
- "ruleConfiguration": {
- "source": {
- "type": "shardingSphereJdbc",
- "parameter": {
- "dataSource": "dataSources:\n ds_0:\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n props:\n jdbcUrl: jdbc:h2:mem:test_db_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL\n username: root\n password: 'password'\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 50\n minPoolSize: 1\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
- "rule": "rules:\n- !SHARDING\n defaultDatabaseStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: user_id\n tables:\n t1:\n actualDataNodes: ds_0.t1\n keyGenerateStrategy:\n column: order_id\n logicTable: t1\n tableStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: order_id\n t2:\n actualDataNodes: ds_0.t2\n keyGenerateStrategy:\n column: order_item_i [...]
- }
- },
- "target": {
- "type": "shardingSphereJdbc",
- "parameter": {
- "dataSource": "dataSources:\n ds_0:\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n props:\n jdbcUrl: jdbc:h2:mem:test_db_2;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL\n username: root\n password: 'password'\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 50\n minPoolSize: 1\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
- "rule": "rules:\n- !SHARDING\n defaultDatabaseStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: user_id\n tables:\n t1:\n actualDataNodes: ds_0.t1\n keyGenerateStrategy:\n column: order_id\n logicTable: t1\n tableStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: order_id\n t2:\n actualDataNodes: ds_0.t2\n keyGenerateStrategy:\n column: order_item_i [...]
- }
- }
- },
- "jobConfiguration": {
- "concurrency": 3
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/logback-test.xml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/logback-test.xml
index 57fd999..7ebf67c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/logback-test.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/resources/logback-test.xml
@@ -27,8 +27,7 @@
</logger>
<logger name="com.zaxxer.hikari" level="warn" />
<logger name="org.apache.shardingsphere.scaling.web.HttpServerHandler" level="off" />
- <logger name="org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter" level="off" />
-
+
<root>
<level value="info" />
<appender-ref ref="console" />
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
index 54c3ce9..70982b2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
@@ -40,7 +40,7 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
public void persistJobPosition(final ScalingJob scalingJob) {
JobPosition jobPosition = new JobPosition();
jobPosition.setStatus(scalingJob.getStatus());
- jobPosition.setDatabaseType(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType());
+ jobPosition.setDatabaseType(scalingJob.getJobConfig().getHandleConfig().getDatabaseType());
jobPosition.setIncrementalPositions(scalingJob.getIncrementalTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
jobPosition.setInventoryPositions(scalingJob.getInventoryTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
REGISTRY_REPOSITORY.persist(ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), scalingJob.getShardingItem()), jobPosition.toJson());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
similarity index 68%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingConfiguration.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
index c1acf43..1f8e325 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/HandleConfiguration.java
@@ -21,13 +21,27 @@ import lombok.Getter;
import lombok.Setter;
/**
- * Scaling configuration.
+ * Handle configuration.
*/
@Setter
@Getter
-public final class ScalingConfiguration {
+public final class HandleConfiguration {
- private RuleConfiguration ruleConfiguration;
+ private Long jobId;
- private JobConfiguration jobConfiguration;
+ private int concurrency = 3;
+
+ private int retryTimes = 3;
+
+ private String[] shardingTables;
+
+ private Integer shardingItem;
+
+ private int shardingSize = 1000 * 10000;
+
+ private boolean running = true;
+
+ private String databaseType;
+
+ private WorkflowConfiguration workflowConfig;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 82057b5..7505205 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -21,27 +21,13 @@ import lombok.Getter;
import lombok.Setter;
/**
- * Job configuration.
+ * Scaling job configuration.
*/
@Setter
@Getter
public final class JobConfiguration {
- private Long jobId;
+ private RuleConfiguration ruleConfig;
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- private String[] shardingTables;
-
- private Integer shardingItem;
-
- private int shardingSize = 1000 * 10000;
-
- private boolean running = true;
-
- private String databaseType;
-
- private WorkflowConfiguration workflowConfig;
+ private HandleConfiguration handleConfig;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
index 113c30b..6d0bd94 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/TaskConfiguration.java
@@ -27,7 +27,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public final class TaskConfiguration {
- private final JobConfiguration jobConfig;
+ private final HandleConfiguration handleConfig;
private final DumperConfiguration dumperConfig;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index ab65d9a..6e7bd77 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job;
import lombok.Getter;
import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
@@ -54,7 +54,7 @@ public final class ScalingJob {
private final transient List<ScalingTask> incrementalTasks = new LinkedList<>();
- private transient ScalingConfiguration scalingConfig;
+ private transient JobConfiguration jobConfig;
private String status = JobStatus.RUNNING.name();
@@ -66,11 +66,11 @@ public final class ScalingJob {
this.jobId = jobId;
}
- public ScalingJob(final ScalingConfiguration scalingConfig) {
- this(Optional.ofNullable(scalingConfig.getJobConfiguration().getJobId()).orElse(generateKey()));
- this.scalingConfig = scalingConfig;
- shardingItem = scalingConfig.getJobConfiguration().getShardingItem();
- taskConfigs.addAll(TaskConfigurationUtil.toTaskConfigs(scalingConfig));
+ public ScalingJob(final JobConfiguration jobConfig) {
+ this(Optional.ofNullable(jobConfig.getHandleConfig().getJobId()).orElse(generateKey()));
+ this.jobConfig = jobConfig;
+ shardingItem = jobConfig.getHandleConfig().getShardingItem();
+ taskConfigs.addAll(TaskConfigurationUtil.toTaskConfigs(jobConfig));
}
private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
index 2487b19..5d566c8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
@@ -78,11 +78,11 @@ public abstract class AbstractDataConsistencyChecker implements DataConsistencyC
}
protected DataSourceWrapper getSourceDataSource() {
- return dataSourceFactory.newInstance(scalingJob.getScalingConfig().getRuleConfiguration().getSource().unwrap());
+ return dataSourceFactory.newInstance(scalingJob.getJobConfig().getRuleConfig().getSource().unwrap());
}
protected DataSourceWrapper getTargetDataSource() {
- return dataSourceFactory.newInstance(scalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
+ return dataSourceFactory.newInstance(scalingJob.getJobConfig().getRuleConfig().getTarget().unwrap());
}
protected abstract ScalingSQLBuilder getSqlBuilder();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
index ef9d272..f5ed010 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environmental/ScalingEnvironmentalManager.java
@@ -43,7 +43,7 @@ public final class ScalingEnvironmentalManager {
*/
public void resetTargetTable(final ScalingJob scalingJob) throws SQLException {
Set<String> tables = scalingJob.getTaskConfigs().stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet());
- try (DataSourceWrapper dataSource = dataSourceFactory.newInstance(scalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
+ try (DataSourceWrapper dataSource = dataSourceFactory.newInstance(scalingJob.getJobConfig().getRuleConfig().getTarget().unwrap());
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
try (PreparedStatement preparedStatement = connection.prepareStatement(ScalingSQLBuilderFactory.newInstance(scalingJob.getDatabaseType()).buildTruncateSQL(each))) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index 2013895..b71b0dd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -31,7 +31,7 @@ import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
-import org.apache.shardingsphere.scaling.core.utils.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.JobConfigurationUtil;
import java.sql.SQLException;
import java.util.LinkedList;
@@ -53,7 +53,7 @@ public final class ScalingJobPreparer {
* @param scalingJob scaling job
*/
public void prepare(final ScalingJob scalingJob) {
- ScalingConfigurationUtil.fillInProperties(scalingJob.getScalingConfig());
+ JobConfigurationUtil.fillInProperties(scalingJob.getJobConfig());
try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
checkDataSource(scalingJob, dataSourceManager);
initIncrementalTasks(scalingJob, dataSourceManager);
@@ -92,7 +92,7 @@ public final class ScalingJobPreparer {
private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) throws SQLException {
for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
each.getDumperConfig().setPosition(getIncrementalPosition(scalingJob, each, dataSourceManager));
- scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
+ scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getHandleConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
}
}
@@ -100,6 +100,6 @@ public final class ScalingJobPreparer {
if (null != scalingJob.getInitPosition()) {
return scalingJob.getInitPosition().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
}
- return PositionInitializerFactory.newInstance(taskConfig.getJobConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
+ return PositionInitializerFactory.newInstance(taskConfig.getHandleConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
index a948dd5..467987b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
@@ -150,14 +150,14 @@ public final class InventoryTaskSplitter {
private Collection<Position<?>> getPositionByPrimaryKeyRange(final ScalingJob scalingJob, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
Collection<Position<?>> result = new ArrayList<>();
- String sql = ScalingSQLBuilderFactory.newInstance(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType())
+ String sql = ScalingSQLBuilderFactory.newInstance(scalingJob.getJobConfig().getHandleConfig().getDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
long beginId = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ps.setLong(1, beginId);
- ps.setLong(2, scalingJob.getScalingConfig().getJobConfiguration().getShardingSize());
+ ps.setLong(2, scalingJob.getJobConfig().getHandleConfig().getShardingSize());
try (ResultSet rs = ps.executeQuery()) {
rs.next();
long endId = rs.getLong(1);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 162bf64..97747b0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -19,9 +19,9 @@ package org.apache.shardingsphere.scaling.core.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -42,14 +42,14 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
@Override
public Optional<ScalingJob> start(final RuleConfigurationsAlteredEvent event) {
- ScalingConfiguration scalingConfig = new ScalingConfiguration();
- scalingConfig.setRuleConfiguration(new RuleConfiguration(
+ JobConfiguration jobConfig = new JobConfiguration();
+ jobConfig.setRuleConfig(new RuleConfiguration(
new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()),
new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule())));
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setWorkflowConfig(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
- scalingConfig.setJobConfiguration(jobConfig);
- return start(scalingConfig);
+ HandleConfiguration handleConfig = new HandleConfiguration();
+ handleConfig.setWorkflowConfig(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+ jobConfig.setHandleConfig(handleConfig);
+ return start(jobConfig);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 89bb13a..caa9a23 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.service;
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
@@ -43,10 +43,10 @@ public interface ScalingJobService {
/**
* Start scaling job.
*
- * @param scalingConfig scaling job configuration
+ * @param jobConfig job configuration
* @return scaling job
*/
- Optional<ScalingJob> start(ScalingConfiguration scalingConfig);
+ Optional<ScalingJob> start(JobConfiguration jobConfig);
/**
* Start scaling job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index c820f5c..8e9c4e3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -23,7 +23,7 @@ import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
@@ -61,12 +61,12 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
}
@Override
- public Optional<ScalingJob> start(final ScalingConfiguration scalingConfig) {
- TaskConfigurationUtil.fillInShardingTables(scalingConfig);
- if (shouldScaling(scalingConfig)) {
- ScalingJob scalingJob = new ScalingJob(scalingConfig);
+ public Optional<ScalingJob> start(final JobConfiguration jobConfig) {
+ TaskConfigurationUtil.fillInShardingTables(jobConfig);
+ if (shouldScaling(jobConfig)) {
+ ScalingJob scalingJob = new ScalingJob(jobConfig);
checkDataSources(scalingJob);
- updateScalingConfig(scalingJob.getJobId(), scalingConfig);
+ updateJobConfig(scalingJob.getJobId(), jobConfig);
log.info("start scaling job {}", scalingJob.getJobId());
return Optional.of(scalingJob);
}
@@ -83,19 +83,19 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
}
}
- private boolean shouldScaling(final ScalingConfiguration scalingConfig) {
- return scalingConfig.getJobConfiguration().getShardingTables().length > 0;
+ private boolean shouldScaling(final JobConfiguration jobConfig) {
+ return jobConfig.getHandleConfig().getShardingTables().length > 0;
}
@Override
public void stop(final long jobId) {
- ScalingConfiguration scalingConfig = getJob(jobId).getScalingConfig();
- scalingConfig.getJobConfiguration().setRunning(false);
- updateScalingConfig(jobId, scalingConfig);
+ JobConfiguration jobConfig = getJob(jobId).getJobConfig();
+ jobConfig.getHandleConfig().setRunning(false);
+ updateJobConfig(jobId, jobConfig);
}
- private void updateScalingConfig(final long jobId, final ScalingConfiguration scalingConfig) {
- REGISTRY_REPOSITORY.persist(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG), GSON.toJson(scalingConfig));
+ private void updateJobConfig(final long jobId, final JobConfiguration jobConfig) {
+ REGISTRY_REPOSITORY.persist(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.CONFIG), GSON.toJson(jobConfig));
}
@Override
@@ -104,14 +104,14 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
if (Strings.isNullOrEmpty(data)) {
throw new ScalingJobNotFoundException(String.format("Can't find scaling job id %s", jobId));
}
- ScalingConfiguration scalingConfig = GSON.fromJson(data, ScalingConfiguration.class);
- scalingConfig.getJobConfiguration().setJobId(jobId);
- return new ScalingJob(scalingConfig);
+ JobConfiguration jobConfig = GSON.fromJson(data, JobConfiguration.class);
+ jobConfig.getHandleConfig().setJobId(jobId);
+ return new ScalingJob(jobConfig);
}
@Override
public JobProgress getProgress(final long jobId) {
- boolean running = getJob(jobId).getScalingConfig().getJobConfiguration().isRunning();
+ boolean running = getJob(jobId).getJobConfig().getHandleConfig().isRunning();
JobProgress result = new JobProgress(jobId, running ? "RUNNING" : "STOPPED");
List<String> shardingItems = REGISTRY_REPOSITORY.getChildrenKeys(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION));
for (String each : shardingItems) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
index a393ce4..fb223af 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.core.service.impl;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -52,8 +52,8 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
}
@Override
- public Optional<ScalingJob> start(final ScalingConfiguration scalingConfig) {
- ScalingJob scalingJob = new ScalingJob(scalingConfig);
+ public Optional<ScalingJob> start(final JobConfiguration jobConfig) {
+ ScalingJob scalingJob = new ScalingJob(jobConfig);
if (scalingJob.getTaskConfigs().isEmpty()) {
return Optional.empty();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java
similarity index 86%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java
index 2eff56d..e4a3ff3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java
@@ -22,8 +22,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
@@ -43,9 +43,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
/**
- * Scaling configuration util.
+ * Job configuration util.
*/
-public final class ScalingConfigurationUtil {
+public final class JobConfigurationUtil {
private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR = initIdAutoIncreaseGenerator();
@@ -60,32 +60,32 @@ public final class ScalingConfigurationUtil {
}
/**
- * Fill in properties for scaling config.
+ * Fill in properties for job configuration.
*
- * @param scalingConfig scaling config
+ * @param jobConfig job configuration
*/
- public static void fillInProperties(final ScalingConfiguration scalingConfig) {
- JobConfiguration jobConfig = scalingConfig.getJobConfiguration();
- if (null == jobConfig.getJobId()) {
- jobConfig.setJobId(generateKey());
+ public static void fillInProperties(final JobConfiguration jobConfig) {
+ HandleConfiguration handleConfig = jobConfig.getHandleConfig();
+ if (null == handleConfig.getJobId()) {
+ handleConfig.setJobId(generateKey());
}
- if (Strings.isNullOrEmpty(jobConfig.getDatabaseType())) {
- jobConfig.setDatabaseType(scalingConfig.getRuleConfiguration().getSource().unwrap().getDatabaseType().getName());
+ if (Strings.isNullOrEmpty(handleConfig.getDatabaseType())) {
+ handleConfig.setDatabaseType(jobConfig.getRuleConfig().getSource().unwrap().getDatabaseType().getName());
}
- if (null == scalingConfig.getJobConfiguration().getShardingTables()) {
- jobConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(scalingConfig)));
+ if (null == jobConfig.getHandleConfig().getShardingTables()) {
+ handleConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
}
}
- private static List<String> getShouldScalingActualDataNodes(final ScalingConfiguration scalingConfig) {
- ScalingDataSourceConfiguration sourceConfig = scalingConfig.getRuleConfiguration().getSource().unwrap();
+ private static List<String> getShouldScalingActualDataNodes(final JobConfiguration jobConfig) {
+ ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
"Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
- if (!(scalingConfig.getRuleConfiguration().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
+ if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
}
- ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
return getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSource(), target.getDataSource()),
getShardingRuleConfigMap(source.getRule()), getShardingRuleConfigMap(target.getRule()));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 5597050..2e1fbda 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.core.utils;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
@@ -45,12 +45,12 @@ public final class ScalingTaskUtil {
* All inventory tasks is finished and all Incremental tasks delay less than allow value.
*
* @param jobProgress job pProgress
- * @param jobConfig job configuration
+ * @param handleConfig handle configuration
* @return almost finished or not
*/
- public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
+ public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final HandleConfiguration handleConfig) {
return jobProgress.getInventoryTaskProgress().stream().allMatch(each -> each.getTotal() == each.getFinished())
- && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getWorkflowConfig().getAllowDelay());
+ && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < handleConfig.getWorkflowConfig().getAllowDelay());
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
index ebf3d61..d237efc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
@@ -28,8 +28,8 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
@@ -67,59 +67,59 @@ import java.util.stream.Collectors;
public final class TaskConfigurationUtil {
/**
- * Split Scaling configuration to task configurations.
+ * Split job configuration to task configurations.
*
- * @param scalingConfig scaling configuration
+ * @param jobConfig job configuration
* @return list of task configurations
*/
- public static Collection<TaskConfiguration> toTaskConfigs(final ScalingConfiguration scalingConfig) {
+ public static Collection<TaskConfiguration> toTaskConfigs(final JobConfiguration jobConfig) {
Collection<TaskConfiguration> result = new LinkedList<>();
- ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfig(scalingConfig);
+ ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfig(jobConfig);
ShardingRuleConfiguration sourceRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(sourceConfig.getRule());
Map<String, DataSourceConfiguration> sourceDataSource = ConfigurationYamlConverter.loadDataSourceConfigs(sourceConfig.getDataSource());
Map<String, DataSource> dataSourceMap = sourceDataSource.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().createDataSource()));
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(new ShardingRule(sourceRuleConfig, sourceConfig.getDatabaseType(), dataSourceMap));
- Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfig(scalingConfig);
- filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfig.getJobConfiguration());
+ Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfig(jobConfig);
+ filterByShardingDataSourceTables(dataSourceTableNameMap, jobConfig.getHandleConfig());
Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
DumperConfiguration dumperConfig = createDumperConfig(entry.getKey(), sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
- ImporterConfiguration importerConfig = createImporterConfig(scalingConfig, shardingColumnsMap);
- result.add(new TaskConfiguration(scalingConfig.getJobConfiguration(), dumperConfig, importerConfig));
+ ImporterConfiguration importerConfig = createImporterConfig(jobConfig, shardingColumnsMap);
+ result.add(new TaskConfiguration(jobConfig.getHandleConfig(), dumperConfig, importerConfig));
}
return result;
}
- private static ShardingSphereJDBCDataSourceConfiguration getSourceConfig(final ScalingConfiguration scalingConfig) {
- ScalingDataSourceConfiguration result = scalingConfig.getRuleConfiguration().getSource().unwrap();
+ private static ShardingSphereJDBCDataSourceConfiguration getSourceConfig(final JobConfiguration jobConfig) {
+ ScalingDataSourceConfiguration result = jobConfig.getRuleConfig().getSource().unwrap();
Preconditions.checkArgument(result instanceof ShardingSphereJDBCDataSourceConfiguration, "Only support ShardingSphere source data source.");
return (ShardingSphereJDBCDataSourceConfiguration) result;
}
- private static Optional<ShardingRuleConfiguration> getTargetRuleConfig(final ScalingConfiguration scalingConfig) {
- ScalingDataSourceConfiguration dataSourceConfig = scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ private static Optional<ShardingRuleConfiguration> getTargetRuleConfig(final JobConfiguration jobConfig) {
+ ScalingDataSourceConfiguration dataSourceConfig = jobConfig.getRuleConfig().getTarget().unwrap();
if (dataSourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration) {
return Optional.of(ConfigurationYamlConverter.loadShardingRuleConfig(((ShardingSphereJDBCDataSourceConfiguration) dataSourceConfig).getRule()));
}
return Optional.empty();
}
- private static void filterByShardingDataSourceTables(final Map<String, Map<String, String>> dataSourceTableNameMap, final JobConfiguration jobConfig) {
- if (null == jobConfig.getShardingTables() || null == jobConfig.getShardingItem()) {
+ private static void filterByShardingDataSourceTables(final Map<String, Map<String, String>> dataSourceTableNameMap, final HandleConfiguration handleConfig) {
+ if (null == handleConfig.getShardingTables() || null == handleConfig.getShardingItem()) {
return;
}
- Map<String, Set<String>> shardingDataSourceTableMap = toDataSourceTableNameMap(getShardingDataSourceTables(jobConfig));
+ Map<String, Set<String>> shardingDataSourceTableMap = toDataSourceTableNameMap(getShardingDataSourceTables(handleConfig));
dataSourceTableNameMap.entrySet().removeIf(entry -> !shardingDataSourceTableMap.containsKey(entry.getKey()));
for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
filterByShardingTables(entry.getValue(), shardingDataSourceTableMap.get(entry.getKey()));
}
}
- private static String getShardingDataSourceTables(final JobConfiguration jobConfig) {
- if (jobConfig.getShardingItem() >= jobConfig.getShardingTables().length) {
+ private static String getShardingDataSourceTables(final HandleConfiguration handleConfig) {
+ if (handleConfig.getShardingItem() >= handleConfig.getShardingTables().length) {
return "";
}
- return jobConfig.getShardingTables()[jobConfig.getShardingItem()];
+ return handleConfig.getShardingTables()[handleConfig.getShardingItem()];
}
private static void filterByShardingTables(final Map<String, String> fullTables, final Set<String> shardingTables) {
@@ -211,36 +211,36 @@ public final class TaskConfigurationUtil {
return result;
}
- private static ImporterConfiguration createImporterConfig(final ScalingConfiguration scalingConfig, final Map<String, Set<String>> shardingColumnsMap) {
+ private static ImporterConfiguration createImporterConfig(final JobConfiguration jobConfig, final Map<String, Set<String>> shardingColumnsMap) {
ImporterConfiguration result = new ImporterConfiguration();
- result.setDataSourceConfig(scalingConfig.getRuleConfiguration().getTarget().unwrap());
+ result.setDataSourceConfig(jobConfig.getRuleConfig().getTarget().unwrap());
result.setShardingColumnsMap(shardingColumnsMap);
- result.setRetryTimes(scalingConfig.getJobConfiguration().getRetryTimes());
+ result.setRetryTimes(jobConfig.getHandleConfig().getRetryTimes());
return result;
}
/**
* Fill in sharding tables.
*
- * @param scalingConfig scaling configuration
+ * @param jobConfig job configuration
*/
- public static void fillInShardingTables(final ScalingConfiguration scalingConfig) {
- if (null != scalingConfig.getJobConfiguration().getShardingTables()) {
+ public static void fillInShardingTables(final JobConfiguration jobConfig) {
+ if (null != jobConfig.getHandleConfig().getShardingTables()) {
return;
}
- scalingConfig.getJobConfiguration().setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(scalingConfig)));
+ jobConfig.getHandleConfig().setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
}
- private static List<String> getShouldScalingActualDataNodes(final ScalingConfiguration scalingConfig) {
- ScalingDataSourceConfiguration sourceConfig = scalingConfig.getRuleConfiguration().getSource().unwrap();
+ private static List<String> getShouldScalingActualDataNodes(final JobConfiguration jobConfig) {
+ ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
"Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
- if (!(scalingConfig.getRuleConfiguration().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
+ if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
}
ShardingSphereJDBCDataSourceConfiguration target =
- (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
Set<String> modifiedDataSources = getModifiedDataSources(source.getDataSource(), target.getDataSource());
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap = getShardingRuleConfigMap(source.getRule());
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap = getShardingRuleConfigMap(target.getRule());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
index af8e097..c6cfc89 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.datasource;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
@@ -41,7 +41,7 @@ public final class DataSourceManagerTest {
@Before
@SneakyThrows(IOException.class)
public void setUp() {
- taskConfigurations = ScalingConfigurationUtil.initJob("/config.json").getTaskConfigs();
+ taskConfigurations = JobConfigurationUtil.initJob("/config.json").getTaskConfigs();
}
@Test
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
index a9af3da..e75bb45 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
@@ -21,7 +21,7 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.junit.Test;
import javax.sql.DataSource;
@@ -61,6 +61,6 @@ public final class AbstractDataConsistencyCheckerTest {
@SneakyThrows(IOException.class)
private ScalingJob mockScalingJob() {
- return ScalingConfigurationUtil.initJob("/config.json");
+ return JobConfigurationUtil.initJob("/config.json");
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
index 8d7f27f..426d172 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -76,7 +76,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
- taskConfig.getJobConfig().setShardingSize(10);
+ taskConfig.getHandleConfig().setShardingSize(10);
initIntPrimaryEnvironment(taskConfig.getDumperConfig());
List<ScalingTask> actual = (List<ScalingTask>) inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
assertNotNull(actual);
@@ -153,10 +153,10 @@ public final class InventoryTaskSplitterTest {
@SneakyThrows(IOException.class)
private ScalingJob mockScalingJob() {
- ScalingJob result = ScalingConfigurationUtil.initJob("/config.json");
- result.getScalingConfig().getJobConfiguration().setDatabaseType("H2");
- result.getScalingConfig().getJobConfiguration().setShardingSize(10);
- taskConfig = new TaskConfiguration(result.getScalingConfig().getJobConfiguration(), mockDumperConfig(), new ImporterConfiguration());
+ ScalingJob result = JobConfigurationUtil.initJob("/config.json");
+ result.getJobConfig().getHandleConfig().setDatabaseType("H2");
+ result.getJobConfig().getHandleConfig().setShardingSize(10);
+ taskConfig = new TaskConfiguration(result.getJobConfig().getHandleConfig(), mockDumperConfig(), new ImporterConfiguration());
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
index 34f5fea..ca86591 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
@@ -58,7 +58,7 @@ public final class InventoryTaskTest {
DumperConfiguration dumperConfig = mockDumperConfig();
ImporterConfiguration importerConfig = mockImporterConfig();
ScalingContext.getInstance().init(new ServerConfiguration());
- taskConfig = new TaskConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
+ taskConfig = new TaskConfiguration(new HandleConfiguration(), dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 7a4a83f..9ee9501 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurat
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import org.junit.After;
@@ -66,21 +66,21 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertListJobs() {
assertThat(scalingJobService.listJobs().size(), is(0));
- scalingJobService.start(mockScalingConfiguration());
+ scalingJobService.start(mockJobConfiguration());
assertThat(scalingJobService.listJobs().size(), is(1));
}
@Test
- public void assertStartWithScalingConfig() {
- Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ public void assertStartWithJobConfig() {
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockJobConfiguration());
assertTrue(scalingJob.isPresent());
assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
}
@Test
public void assertStartWithCallbackImmediately() {
- ScalingConfiguration scalingConfig = mockScalingConfiguration();
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
+ JobConfiguration jobConfig = mockJobConfiguration();
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getSource().unwrap();
RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent("schema", source.getDataSource(), source.getRule(), source.getRule(), "cacheId");
Optional<ScalingJob> scalingJob = scalingJobService.start(event);
assertFalse(scalingJob.isPresent());
@@ -88,9 +88,9 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertStartWithCallbackSuccess() throws IOException {
- ScalingConfiguration scalingConfig = ScalingConfigurationUtil.initConfig("/config_sharding_sphere_jdbc_target.json");
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
- ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ JobConfiguration jobConfig = JobConfigurationUtil.initJobConfig("/config_sharding_sphere_jdbc_target.json");
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getSource().unwrap();
+ ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent(
"schema", source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), "cacheId");
Optional<ScalingJob> scalingJob = scalingJobService.start(event);
@@ -99,7 +99,7 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertStop() {
- Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockJobConfiguration());
assertTrue(scalingJob.isPresent());
scalingJobService.stop(scalingJob.get().getJobId());
assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(scalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":false"));
@@ -112,7 +112,7 @@ public final class DistributedScalingJobServiceTest {
@Test
public void assertGetProgress() {
- registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/config"), new Gson().toJson(mockScalingConfiguration()));
+ registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/config"), new Gson().toJson(mockJobConfiguration()));
registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/position/0/inventory"),
"{'unfinished': {'ds1.table1#1':[0,100],'ds1.table1#2':[160,200],'ds1.table3':[]},'finished':['ds1.table2#1','ds1.table2#2']}");
registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/position/0/incremental"),
@@ -155,7 +155,7 @@ public final class DistributedScalingJobServiceTest {
}
@SneakyThrows(IOException.class)
- private ScalingConfiguration mockScalingConfiguration() {
- return ScalingConfigurationUtil.initConfig("/config.json");
+ private JobConfiguration mockJobConfiguration() {
+ return JobConfigurationUtil.initJobConfig("/config.json");
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index dd8b98c..16bdcf5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.service.impl;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
@@ -31,7 +31,7 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResu
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
@@ -75,7 +75,7 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertStartJob() {
- Optional<ScalingJob> scalingJob = scalingJobService.start(mockScalingConfiguration());
+ Optional<ScalingJob> scalingJob = scalingJobService.start(mockJobConfiguration());
assertTrue(scalingJob.isPresent());
long jobId = scalingJob.get().getJobId();
JobProgress progress = scalingJobService.getProgress(jobId);
@@ -106,13 +106,13 @@ public final class StandaloneScalingJobServiceTest {
@Test
public void assertListJobs() {
assertThat(scalingJobService.listJobs().size(), is(0));
- scalingJobService.start(mockScalingConfiguration());
+ scalingJobService.start(mockJobConfiguration());
assertThat(scalingJobService.listJobs().size(), is(1));
}
@Test
public void assertCheckJob() {
- Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+ Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockJobConfiguration());
assertTrue(scalingJobOptional.isPresent());
ScalingJob scalingJob = scalingJobOptional.get();
scalingJob.setDatabaseType("H2");
@@ -124,7 +124,7 @@ public final class StandaloneScalingJobServiceTest {
@Test
@SneakyThrows(SQLException.class)
public void assertResetJob() {
- Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockScalingConfiguration());
+ Optional<ScalingJob> scalingJobOptional = scalingJobService.start(mockJobConfiguration());
assertTrue(scalingJobOptional.isPresent());
ScalingJob scalingJob = scalingJobOptional.get();
ScalingDataSourceConfiguration dataSourceConfig = scalingJob.getTaskConfigs().get(0).getImporterConfig().getDataSourceConfig();
@@ -135,8 +135,8 @@ public final class StandaloneScalingJobServiceTest {
}
@SneakyThrows(IOException.class)
- private ScalingConfiguration mockScalingConfiguration() {
- return ScalingConfigurationUtil.initConfig("/config.json");
+ private JobConfiguration mockJobConfiguration() {
+ return JobConfigurationUtil.initJobConfig("/config.json");
}
private void initTableData(final ScalingDataSourceConfiguration dataSourceConfig) throws SQLException {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
similarity index 59%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ScalingConfigurationUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index 6a03731..c365570 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ScalingConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -15,34 +15,46 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.util;
+package org.apache.shardingsphere.scaling.core.util;
import com.google.gson.Gson;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
- * Scaling configuration util.
+ * Job configuration util.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ScalingConfigurationUtil {
+public final class JobConfigurationUtil {
/**
* Init job config.
*
* @param configFile config file
- * @return ScalingConfiguration
+ * @return job configuration
* @throws IOException IO exception
*/
- public static ScalingConfiguration initConfig(final String configFile) throws IOException {
- try (InputStream fileInputStream = ScalingConfigurationUtil.class.getResourceAsStream(configFile);
+ public static JobConfiguration initJobConfig(final String configFile) throws IOException {
+ try (InputStream fileInputStream = JobConfigurationUtil.class.getResourceAsStream(configFile);
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)) {
- return new Gson().fromJson(inputStreamReader, ScalingConfiguration.class);
+ return new Gson().fromJson(inputStreamReader, JobConfiguration.class);
}
}
+
+ /**
+ * Init job from config file.
+ *
+ * @param configFile config file
+ * @return scaling job
+ * @throws IOException IO exception
+ */
+ public static ScalingJob initJob(final String configFile) throws IOException {
+ return new ScalingJob(initJobConfig(configFile));
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
deleted file mode 100644
index 6d36e1c..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/ScalingConfigurationUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.util;
-
-import com.google.gson.Gson;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
-import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Scaling configuration util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ScalingConfigurationUtil {
-
- /**
- * Init job config.
- *
- * @param configFile config file
- * @return ScalingConfiguration
- * @throws IOException IO exception
- */
- public static ScalingConfiguration initConfig(final String configFile) throws IOException {
- try (InputStream fileInputStream = ScalingConfigurationUtil.class.getResourceAsStream(configFile);
- InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)) {
- return new Gson().fromJson(inputStreamReader, ScalingConfiguration.class);
- }
- }
-
- /**
- * Init job from config file.
- *
- * @param configFile config file
- * @return scaling job
- * @throws IOException IO exception
- */
- public static ScalingJob initJob(final String configFile) throws IOException {
- return new ScalingJob(initConfig(configFile));
- }
-
- /**
- * Get config by file.
- *
- * @param configFile config file
- * @return config string
- * @throws IOException IO exception
- */
- public static String getConfig(final String configFile) throws IOException {
- StringBuilder result = new StringBuilder();
- try (FileReader fileReader = new FileReader(ScalingConfigurationUtil.class.getResource(configFile).getFile());
- BufferedReader bufferedReader = new BufferedReader(fileReader)) {
- String line;
- while (null != (line = bufferedReader.readLine())) {
- result.append(line).append(System.lineSeparator());
- }
- return result.toString();
- }
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config.json b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config.json
index ca827f3..ac2a2bd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config.json
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config.json
@@ -16,7 +16,7 @@
*/
{
- "ruleConfiguration": {
+ "ruleConfig": {
"source": {
"type": "shardingSphereJdbc",
"parameter": {
@@ -34,7 +34,7 @@
}
}
},
- "jobConfiguration": {
+ "handleConfig": {
"concurrency": 3,
"shardingTables": ["ds_0.t1", "ds_0.t2"],
"shardingItem": 0
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
index 7b47e30..97f50b9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
@@ -16,7 +16,7 @@
*/
{
- "ruleConfiguration": {
+ "ruleConfig": {
"source": {
"type": "shardingSphereJdbc",
"parameter": {
@@ -32,7 +32,7 @@
}
}
},
- "jobConfiguration": {
+ "handleConfig": {
"concurrency": 3
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
index 0abdbf2..17efb6c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
@@ -24,7 +24,6 @@ import com.google.gson.JsonSyntaxException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
@@ -33,7 +32,7 @@ import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.spi.ScalingWorker;
@@ -74,17 +73,17 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
private void watchConfigRepository() {
REGISTRY_REPOSITORY.watch(ScalingConstant.SCALING_LISTENER_PATH, event -> {
- Optional<ScalingConfiguration> scalingConfig = getScalingConfig(event);
- if (!scalingConfig.isPresent()) {
+ Optional<JobConfiguration> jobConfig = getJobConfig(event);
+ if (!jobConfig.isPresent()) {
return;
}
switch (event.getType()) {
case ADDED:
case UPDATED:
- executeJob(getJobId(event.getKey()), scalingConfig.get());
+ executeJob(getJobId(event.getKey()), jobConfig.get());
break;
case DELETED:
- deleteJob(getJobId(event.getKey()), scalingConfig.get());
+ deleteJob(getJobId(event.getKey()), jobConfig.get());
break;
default:
break;
@@ -100,62 +99,63 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
return key.split("/")[2];
}
- private Optional<ScalingConfiguration> getScalingConfig(final DataChangedEvent event) {
+ private Optional<JobConfiguration> getJobConfig(final DataChangedEvent event) {
if (!CONFIG_PATTERN.matcher(event.getKey()).matches()) {
return Optional.empty();
}
try {
- log.info("{} scaling config: {} = {}", event.getType(), event.getKey(), event.getValue());
- return Optional.of(GSON.fromJson(event.getValue(), ScalingConfiguration.class));
+ log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
+ return Optional.of(GSON.fromJson(event.getValue(), JobConfiguration.class));
} catch (JsonSyntaxException ex) {
- log.error("analyze scaling config failed.", ex);
+ log.error("analyze job config failed.", ex);
}
return Optional.empty();
}
- private void executeJob(final String jobId, final ScalingConfiguration scalingConfig) {
+ private void executeJob(final String jobId, final JobConfiguration jobConfig) {
JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
if (null == jobBootstrapWrapper) {
- createJob(jobId, scalingConfig);
+ createJob(jobId, jobConfig);
return;
}
- updateJob(jobId, scalingConfig);
+ updateJob(jobId, jobConfig);
}
- private void createJob(final String jobId, final ScalingConfiguration scalingConfig) {
- if (scalingConfig.getJobConfiguration().isRunning()) {
- JobBootstrapWrapper jobBootstrapWrapper = new JobBootstrapWrapper(jobId, scalingConfig);
+ private void createJob(final String jobId, final JobConfiguration jobConfig) {
+ if (jobConfig.getHandleConfig().isRunning()) {
+ JobBootstrapWrapper jobBootstrapWrapper = new JobBootstrapWrapper(jobId, jobConfig);
jobBootstrapWrapper.getJobBootstrap().execute();
scalingJobBootstrapMap.put(jobId, jobBootstrapWrapper);
}
}
- private void updateJob(final String jobId, final ScalingConfiguration scalingConfig) {
+ private void updateJob(final String jobId, final JobConfiguration jobConfig) {
JobBootstrapWrapper jobBootstrapWrapper = scalingJobBootstrapMap.get(jobId);
- if (jobBootstrapWrapper.isRunning() && scalingConfig.getJobConfiguration().isRunning()) {
+ if (jobBootstrapWrapper.isRunning() && jobConfig.getHandleConfig().isRunning()) {
log.warn("scaling elastic job has already running, ignore current config.");
return;
}
- if (jobBootstrapWrapper.isRunning() == scalingConfig.getJobConfiguration().isRunning()) {
+ if (jobBootstrapWrapper.isRunning() == jobConfig.getHandleConfig().isRunning()) {
return;
}
if (new LeaderService(registryCenter, jobId).isLeader()) {
log.info("leader worker update config.");
JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
governanceConfig.getName() + ScalingConstant.SCALING_ELASTIC_JOB_PATH, null)
- .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, scalingConfig)));
+ .updateJobConfiguration(JobConfigurationPOJO.fromJobConfiguration(createJobConfig(jobId, jobConfig)));
}
- jobBootstrapWrapper.setRunning(scalingConfig.getJobConfiguration().isRunning());
+ jobBootstrapWrapper.setRunning(jobConfig.getHandleConfig().isRunning());
jobBootstrapWrapper.getJobBootstrap().execute();
}
- private JobConfiguration createJobConfig(final String jobId, final ScalingConfiguration scalingConfig) {
- return JobConfiguration.newBuilder(jobId, scalingConfig.getJobConfiguration().getShardingTables().length).jobParameter(GSON.toJson(scalingConfig)).overwrite(true).build();
+ private org.apache.shardingsphere.elasticjob.api.JobConfiguration createJobConfig(final String jobId, final JobConfiguration jobConfig) {
+ return org.apache.shardingsphere.elasticjob.api.JobConfiguration.newBuilder(jobId, jobConfig.getHandleConfig().getShardingTables().length)
+ .jobParameter(GSON.toJson(jobConfig)).overwrite(true).build();
}
- private void deleteJob(final String jobId, final ScalingConfiguration scalingConfig) {
- scalingConfig.getJobConfiguration().setRunning(false);
- executeJob(jobId, scalingConfig);
+ private void deleteJob(final String jobId, final JobConfiguration jobConfig) {
+ jobConfig.getHandleConfig().setRunning(false);
+ executeJob(jobId, jobConfig);
}
@Getter
@@ -166,9 +166,9 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
private boolean running;
- private JobBootstrapWrapper(final String jobId, final ScalingConfiguration scalingConfig) {
- jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingElasticJob(), createJobConfig(jobId, scalingConfig));
- running = scalingConfig.getJobConfiguration().isRunning();
+ private JobBootstrapWrapper(final String jobId, final JobConfiguration jobConfig) {
+ jobBootstrap = new OneOffJobBootstrap(registryCenter, new ScalingElasticJob(), createJobConfig(jobId, jobConfig));
+ running = jobConfig.getHandleConfig().isRunning();
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
index 4b3c127..47740a0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
@@ -50,11 +50,11 @@ public final class FinishedCheckJob implements SimpleJob {
long jobId = Long.parseLong(each);
try {
ScalingJob scalingJob = scalingJobService.getJob(jobId);
- WorkflowConfiguration workflowConfig = scalingJob.getScalingConfig().getJobConfiguration().getWorkflowConfig();
+ WorkflowConfiguration workflowConfig = scalingJob.getJobConfig().getHandleConfig().getWorkflowConfig();
if (workflowConfig == null) {
continue;
}
- if (ScalingTaskUtil.allTasksAlmostFinished(scalingJobService.getProgress(jobId), scalingJob.getScalingConfig().getJobConfiguration())) {
+ if (ScalingTaskUtil.allTasksAlmostFinished(scalingJobService.getProgress(jobId), scalingJob.getJobConfig().getHandleConfig())) {
log.info("scaling job {} almost finished.", jobId);
trySwitch(jobId, workflowConfig);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
index 34a6824..f0735cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.service.impl.StandaloneScalingJobService;
@@ -43,19 +43,19 @@ public final class ScalingElasticJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
log.info("execute job: {} - {}/{}", shardingContext.getTaskId(), shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());
- ScalingConfiguration scalingConfig = GSON.fromJson(shardingContext.getJobParameter(), ScalingConfiguration.class);
- if (scalingConfig.getJobConfiguration().isRunning()) {
- startJob(scalingConfig, shardingContext);
+ JobConfiguration jobConfig = GSON.fromJson(shardingContext.getJobParameter(), JobConfiguration.class);
+ if (jobConfig.getHandleConfig().isRunning()) {
+ startJob(jobConfig, shardingContext);
return;
}
stopJob(shardingContext);
}
- private void startJob(final ScalingConfiguration scalingConfig, final ShardingContext shardingContext) {
+ private void startJob(final JobConfiguration jobConfig, final ShardingContext shardingContext) {
log.info("start job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
- scalingConfig.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
- scalingConfig.getJobConfiguration().setJobId(Long.valueOf(shardingContext.getJobName()));
- scalingJob = SCALING_JOB_SERVICE.start(scalingConfig).orElse(null);
+ jobConfig.getHandleConfig().setShardingItem(shardingContext.getShardingItem());
+ jobConfig.getHandleConfig().setJobId(Long.valueOf(shardingContext.getJobName()));
+ scalingJob = SCALING_JOB_SERVICE.start(jobConfig).orElse(null);
JobSchedulerCenter.addJob(scalingJob);
}