You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/12/04 10:23:57 UTC
[shardingsphere] branch master updated:
DistributedScalingJobService add callback function (#8504)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa295fc DistributedScalingJobService add callback function (#8504)
fa295fc is described below
commit fa295fc35bcfa63085bae74266dea315f1dec014
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Dec 4 18:23:10 2020 +0800
DistributedScalingJobService add callback function (#8504)
* DistributedScalingJobService add callback function
* Update governance dependency
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere-scaling-core/pom.xml | 11 ++-
.../scaling/core/config/JobConfiguration.java | 4 +-
.../core/service/AbstractScalingJobService.java | 67 ++++++++++++-----
.../ScalingCallback.java} | 31 +++-----
.../scaling/core/service/ScalingJobService.java | 21 ++----
.../service/impl/DistributedScalingJobService.java | 13 +++-
.../scaling/core/utils/ProxyConfigurationUtil.java | 86 ----------------------
.../scaling/core/utils/ScalingTaskUtil.java | 21 +++++-
.../scaling/core/utils/TaskConfigurationUtil.java | 2 +-
.../impl/DistributedScalingJobServiceTest.java | 63 +++++++++++-----
.../config_sharding_sphere_jdbc_target.json | 38 ++++++++++
.../test/resources/proxy_config-sharding_1.yaml | 64 ----------------
.../test/resources/proxy_config-sharding_2.yaml | 84 ---------------------
13 files changed, 190 insertions(+), 315 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index 19911d2..90dbe30 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-governance-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-governance-repository-zookeeper-curator</artifactId>
<version>${project.version}</version>
</dependency>
@@ -64,11 +69,5 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-proxy-common</artifactId>
- <version>5.0.0-RC1-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index cb6bc0d..6ebaae4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -37,7 +37,9 @@ public final class JobConfiguration {
private int shardingItem;
- private int shardingSize = 10000000;
+ private int shardingSize = 1000 * 10000;
private boolean running = true;
+
+ private long allowDelay = 60 * 1000L;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index f9a2a20..93f65f7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -17,39 +17,47 @@
package org.apache.shardingsphere.scaling.core.service;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.utils.ProxyConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Abstract scaling job service.
*/
public abstract class AbstractScalingJobService implements ScalingJobService {
- @Override
- public boolean shouldScaling(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
- ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
- TaskConfigurationUtil.fillInShardingTables(scalingConfig);
- return shouldScaling(scalingConfig);
- }
-
- private boolean shouldScaling(final ScalingConfiguration scalingConfig) {
- return scalingConfig.getJobConfiguration().getShardingTables().length > 0;
- }
+ private static final ScheduledExecutorService FINISH_CHECK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("Scaling-finish-check-%d"));
@Override
- public Optional<ScalingJob> start(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
- ScalingConfiguration scalingConfig = ProxyConfigurationUtil.toScalingConfig(oldYamlProxyConfig, newYamlProxyConfig);
- TaskConfigurationUtil.fillInShardingTables(scalingConfig);
- if (!shouldScaling(scalingConfig)) {
- return Optional.empty();
+ public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
+ Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
+ if (!result.isPresent()) {
+ scalingCallback.onSuccess();
+ return result;
}
+ FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 3, 1, TimeUnit.MINUTES);
+ return result;
+ }
+
+ private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
+ ScalingConfiguration scalingConfig = new ScalingConfiguration();
+ scalingConfig.setRuleConfiguration(
+ new RuleConfiguration(new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule), new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
+ scalingConfig.setJobConfiguration(new JobConfiguration());
return start(scalingConfig);
}
@@ -73,4 +81,29 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
}
return result;
}
+
+ @RequiredArgsConstructor
+ private class JobFinishChecker implements Runnable {
+
+ private final ScalingJob scalingJob;
+
+ private final ScalingCallback scalingCallback;
+
+ private boolean finished;
+
+ @Override
+ public void run() {
+ if (finished) {
+ return;
+ }
+ JobProgress jobProgress = getProgress(scalingJob.getJobId());
+ if (jobProgress.getStatus().contains("FAILURE")) {
+ finished = true;
+ scalingCallback.onFailure();
+ } else if (ScalingTaskUtil.allTasksAlmostFinished(jobProgress, scalingJob.getScalingConfig().getJobConfiguration())) {
+ finished = true;
+ scalingCallback.onSuccess();
+ }
+ }
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
similarity index 65%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
index cb6bc0d..b20eba2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
@@ -15,29 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
-
-import lombok.Getter;
-import lombok.Setter;
+package org.apache.shardingsphere.scaling.core.service;
/**
- * Job configuration.
+ * Scaling callback.
*/
-@Setter
-@Getter
-public final class JobConfiguration {
-
- private Long jobId;
-
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- private String[] shardingTables;
-
- private int shardingItem;
+public interface ScalingCallback {
- private int shardingSize = 10000000;
+ /**
+ * Callback when execute success.
+ */
+ void onSuccess();
- private boolean running = true;
+ /**
+ * Callback when execute failure.
+ */
+ void onFailure();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index 9002a3d..87aaa3a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -39,15 +39,6 @@ public interface ScalingJobService {
List<ScalingJob> listJobs();
/**
- * Check new yaml proxy configuration if should scaling.
- *
- * @param oldYamlProxyConfig old yaml proxy configuration
- * @param newYamlProxyConfig new yaml proxy configuration
- * @return if should scaling
- */
- boolean shouldScaling(String oldYamlProxyConfig, String newYamlProxyConfig);
-
- /**
* Start scaling job.
*
* @param scalingConfig scaling job configuration
@@ -55,15 +46,17 @@ public interface ScalingJobService {
*/
Optional<ScalingJob> start(ScalingConfiguration scalingConfig);
-
/**
- * Start scaling job if it should scaling.
+ * Start scaling job.
*
- * @param oldYamlProxyConfig old yaml proxy configuration
- * @param newYamlProxyConfig new yaml proxy configuration
+ * @param sourceDataSource source data source
+ * @param sourceRule source rule
+ * @param targetDataSource target data source
+ * @param targetRule target rule
+ * @param scalingCallback scaling callback
* @return scaling job
*/
- Optional<ScalingJob> start(String oldYamlProxyConfig, String newYamlProxyConfig);
+ Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
/**
* Stop a job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 0b06059..069a044 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -57,10 +57,17 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
@Override
public Optional<ScalingJob> start(final ScalingConfiguration scalingConfig) {
- ScalingJob scalingJob = new ScalingJob();
TaskConfigurationUtil.fillInShardingTables(scalingConfig);
- updateScalingConfig(scalingJob.getJobId(), scalingConfig);
- return Optional.of(scalingJob);
+ if (shouldScaling(scalingConfig)) {
+ ScalingJob scalingJob = new ScalingJob();
+ updateScalingConfig(scalingJob.getJobId(), scalingConfig);
+ return Optional.of(scalingJob);
+ }
+ return Optional.empty();
+ }
+
+ private boolean shouldScaling(final ScalingConfiguration scalingConfig) {
+ return scalingConfig.getJobConfiguration().getShardingTables().length > 0;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java
deleted file mode 100644
index 4a9e7d5..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ProxyConfigurationUtil.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfiguration;
-import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.proxy.config.yaml.YamlDataSourceParameter;
-import org.apache.shardingsphere.proxy.config.yaml.YamlProxyRuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-
-/**
- * Proxy configuration Util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ProxyConfigurationUtil {
-
- /**
- * Yaml proxy configuration transform to {@code ScalingConfiguration}.
- *
- * @param oldYamlProxyConfig old yaml proxy configuration
- * @param newYamlProxyConfig new yaml proxy configuration
- * @return {@code ScalingConfiguration} instance
- */
- public static ScalingConfiguration toScalingConfig(final String oldYamlProxyConfig, final String newYamlProxyConfig) {
- ScalingConfiguration result = new ScalingConfiguration();
- result.setRuleConfiguration(new RuleConfiguration(toDataSourceConfig(oldYamlProxyConfig), toDataSourceConfig(newYamlProxyConfig)));
- result.setJobConfiguration(new JobConfiguration());
- return result;
- }
-
- private static DataSourceConfiguration toDataSourceConfig(final String yamlProxyConfig) {
- YamlProxyRuleConfiguration proxyRuleConfig = YamlEngine.unmarshal(yamlProxyConfig, YamlProxyRuleConfiguration.class);
- return new ShardingSphereJDBCDataSourceConfiguration(getDataSourceConfig(proxyRuleConfig), getRuleConfig(proxyRuleConfig));
- }
-
- private static String getDataSourceConfig(final YamlProxyRuleConfiguration proxyRuleConfig) {
- YamlDataSourceConfigurationWrap result = new YamlDataSourceConfigurationWrap();
- Map<String, YamlDataSourceConfiguration> dataSources = proxyRuleConfig.getDataSources().entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry -> toYamlDataSourceConfig(proxyRuleConfig, entry.getValue())));
- result.setDataSources(dataSources);
- return YamlEngine.marshal(result);
- }
-
- @SneakyThrows(IllegalAccessException.class)
- private static YamlDataSourceConfiguration toYamlDataSourceConfig(final YamlProxyRuleConfiguration proxyRuleConfig, final YamlDataSourceParameter yamlDataSourceParameter) {
- YamlDataSourceConfiguration result = new YamlDataSourceConfiguration();
- result.setDataSourceClassName("com.zaxxer.hikari.HikariDataSource");
- Map<String, Object> props = new HashMap<>(ReflectionUtil.getFieldMap(yamlDataSourceParameter));
- result.setProps(props);
- return result;
- }
-
- private static String getRuleConfig(final YamlProxyRuleConfiguration proxyRuleConfig) {
- YamlProxyRuleConfiguration result = new YamlProxyRuleConfiguration();
- result.setRules(proxyRuleConfig.getRules());
- return YamlEngine.marshal(result);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index cc55a09..c62f2de 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -17,12 +17,16 @@
package org.apache.shardingsphere.scaling.core.utils;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import java.util.List;
+import java.util.Collection;
/**
* Scaling task util.
@@ -35,11 +39,24 @@ public final class ScalingTaskUtil {
* @param inventoryTasks to check inventory tasks
* @return is finished
*/
- public static boolean allInventoryTasksFinished(final List<ScalingTask> inventoryTasks) {
+ public static boolean allInventoryTasksFinished(final Collection<ScalingTask> inventoryTasks) {
return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPositionManager().getPosition() instanceof FinishedPosition);
}
/**
+ * All inventory tasks is finished and all Incremental tasks delay less than allow value.
+ *
+ * @param jobProgress job pProgress
+ * @param jobConfig job configuration
+ * @return almost finished or not
+ */
+ public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
+ return jobProgress.getInventoryTaskProgress().values().stream().flatMap(Collection::stream).allMatch(each -> ((InventoryTaskProgress) each).isFinished())
+ && jobProgress.getIncrementalTaskProgress().values().stream().flatMap(Collection::stream)
+ .allMatch(each -> ((IncrementalTaskProgress) each).getDelayMillisecond() < jobConfig.getAllowDelay());
+ }
+
+ /**
* Get scaling listener path.
*
* @param paths sub paths.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
index 8009863..9d99fa9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
@@ -237,10 +237,10 @@ public final class TaskConfigurationUtil {
}
ShardingSphereJDBCDataSourceConfiguration target =
(ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
- List<String> result = new ArrayList<>();
Set<String> modifiedDataSources = getModifiedDataSources(source.getDataSource(), target.getDataSource());
Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap = getShardingRuleConfigMap(source.getRule());
Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap = getShardingRuleConfigMap(target.getRule());
+ List<String> result = new ArrayList<>();
newShardingRuleConfigMap.keySet().forEach(each -> {
if (!oldShardingRuleConfigMap.containsKey(each)) {
return;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 230de79..345fe89 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -24,12 +24,14 @@ import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -40,6 +42,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
@@ -69,15 +72,6 @@ public final class DistributedScalingJobServiceTest {
}
@Test
- @SneakyThrows(IOException.class)
- public void assertShouldScaling() {
- String oldConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_1.yaml");
- String newConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_2.yaml");
- assertFalse(scalingJobService.shouldScaling(oldConfig, oldConfig));
- assertTrue(scalingJobService.shouldScaling(oldConfig, newConfig));
- }
-
- @Test
public void assertStartWithScalingConfig() {
Optional<ScalingJob> shardingScalingJob = scalingJobService.start(mockScalingConfiguration());
assertTrue(shardingScalingJob.isPresent());
@@ -85,14 +79,22 @@ public final class DistributedScalingJobServiceTest {
}
@Test
- @SneakyThrows(IOException.class)
- public void assertStartWithProxyConfig() {
- String oldConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_1.yaml");
- String newConfig = ScalingConfigurationUtil.getConfig("/proxy_config-sharding_2.yaml");
- assertFalse(scalingJobService.start(oldConfig, oldConfig).isPresent());
- Optional<ScalingJob> shardingScalingJob = scalingJobService.start(oldConfig, newConfig);
- assertTrue(shardingScalingJob.isPresent());
- assertTrue(registryRepository.get(ScalingTaskUtil.getScalingListenerPath(shardingScalingJob.get().getJobId(), ScalingConstant.CONFIG)).contains("\"running\":true"));
+ public void assertStartWithCallbackImmediately() {
+ ScalingConfiguration scalingConfig = mockScalingConfiguration();
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
+ AtomicBoolean successCallback = new AtomicBoolean();
+ Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback(successCallback));
+ assertFalse(scalingJob.isPresent());
+ assertTrue(successCallback.get());
+ }
+
+ @Test
+ public void assertStartWithCallbackSuccess() throws IOException {
+ ScalingConfiguration scalingConfig = ScalingConfigurationUtil.initConfig("/config_sharding_sphere_jdbc_target.json");
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
+ ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), mockScalingCallback());
+ assertTrue(scalingJob.isPresent());
}
@Test
@@ -150,6 +152,33 @@ public final class DistributedScalingJobServiceTest {
return result;
}
+ private ScalingCallback mockScalingCallback() {
+ return new ScalingCallback() {
+
+ @Override
+ public void onSuccess() {
+ }
+
+ @Override
+ public void onFailure() {
+ }
+ };
+ }
+
+ private ScalingCallback mockScalingCallback(final AtomicBoolean successCallback) {
+ return new ScalingCallback() {
+
+ @Override
+ public void onSuccess() {
+ successCallback.set(true);
+ }
+
+ @Override
+ public void onFailure() {
+ }
+ };
+ }
+
@SneakyThrows(ReflectiveOperationException.class)
private void resetRegistryRepositoryAvailable() {
ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
new file mode 100644
index 0000000..f659188
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/config_sharding_sphere_jdbc_target.json
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "ruleConfiguration": {
+ "source": {
+ "type": "shardingSphereJdbc",
+ "parameter": {
+ "dataSource": "dataSources:\n ds_0:\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n props:\n jdbcUrl: jdbc:mysql://127.0.0.1:3306/demo_ds_0?serverTimezone=UTC&useSSL=false\n username: root\n password: 'password'\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 50\n minPoolSize: 1\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
+ "rule": "rules:\n- !SHARDING\n defaultDatabaseStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: user_id\n tables:\n t1:\n actualDataNodes: ds_0.t1\n keyGenerateStrategy:\n column: order_id\n logicTable: t1\n tableStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: order_id\n t2:\n actualDataNodes: ds_0.t2\n keyGenerateStrategy:\n column: order_item_i [...]
+ }
+ },
+ "target": {
+ "type": "shardingSphereJdbc",
+ "parameter": {
+ "dataSource": "dataSources:\n ds_0:\n dataSourceClassName: com.zaxxer.hikari.HikariDataSource\n props:\n jdbcUrl: jdbc:mysql://127.0.0.1:3306/demo_ds_1?serverTimezone=UTC&useSSL=false\n username: root\n password: 'password'\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 50\n minPoolSize: 1\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n",
+ "rule": "rules:\n- !SHARDING\n defaultDatabaseStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: user_id\n tables:\n t1:\n actualDataNodes: ds_0.t1\n keyGenerateStrategy:\n column: order_id\n logicTable: t1\n tableStrategy:\n standard:\n shardingAlgorithmName: inline\n shardingColumn: order_id\n t2:\n actualDataNodes: ds_0.t2\n keyGenerateStrategy:\n column: order_item_i [...]
+ }
+ }
+ },
+ "jobConfiguration": {
+ "concurrency": 3
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml
deleted file mode 100644
index 4a327d9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_1.yaml
+++ /dev/null
@@ -1,64 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-schemaName: sharding_db
-
-dataSources:
- ds_0:
- url: jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
- ds_1:
- url: jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
-
-rules:
- - !SHARDING
- tables:
- t_order:
- actualDataNodes: ds_$->{0..1}.t_order_$->{0..1}
- logicTable: t_order
- databaseStrategy:
- standard:
- shardingColumn: id
- shardingAlgorithmName: t_order_db_algorith
- tableStrategy:
- standard:
- shardingColumn: user_id
- shardingAlgorithmName: t_order_tbl_algorith
- shardingAlgorithms:
- t_order_db_algorith:
- type: INLINE
- props:
- algorithm-expression: ds_$->{id % 2}
- t_order_tbl_algorith:
- type: INLINE
- props:
- algorithm-expression: t_order_$->{user_id % 2}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml
deleted file mode 100644
index 2bc38f2..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/proxy_config-sharding_2.yaml
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-schemaName: sharding_db
-
-dataSources:
- ds_0:
- url: jdbc:mysql://192.168.0.1:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
- ds_1:
- url: jdbc:mysql://192.168.0.2:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
- ds_2:
- url: jdbc:mysql://192.168.0.3:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
- ds_3:
- url: jdbc:mysql://192.168.0.4:3306/scaling?serverTimezone=UTC&useSSL=false
- username: scaling
- password: scaling
- connectionTimeoutMilliseconds: 30000
- idleTimeoutMilliseconds: 60000
- maxLifetimeMilliseconds: 1800000
- maxPoolSize: 50
- minPoolSize: 1
- maintenanceIntervalMilliseconds: 30000
-
-rules:
- - !SHARDING
- tables:
- t_order:
- actualDataNodes: ds_$->{0..3}.t_order_$->{0..3}
- logicTable: t_order
- databaseStrategy:
- standard:
- shardingColumn: id
- shardingAlgorithmName: t_order_db_algorith
- tableStrategy:
- standard:
- shardingColumn: user_id
- shardingAlgorithmName: t_order_tbl_algorith
- shardingAlgorithms:
- t_order_db_algorith:
- type: INLINE
- props:
- algorithm-expression: ds_$->{id % 4}
- t_order_tbl_algorith:
- type: INLINE
- props:
- algorithm-expression: t_order_$->{user_id % 4}