You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/01/28 03:02:04 UTC
[shardingsphere] branch master updated: Rewrite scaling position
persist and resume (#9195)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cb82953 Rewrite scaling position persist and resume (#9195)
cb82953 is described below
commit cb82953e4d384907965198a7a42fcc8551d77905
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jan 28 11:01:26 2021 +0800
Rewrite scaling position persist and resume (#9195)
* Remove scaling position manager and resumer.
* Rewrite scaling position persist and resume.
* update exception message.
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/fixture/FixtureH2ScalingEntry.java | 6 +-
...anager.java => FixturePositionInitializer.java} | 13 +-
.../fixture/FixtureResumeBreakPointManager.java | 38 -----
.../scaling/web/HttpServerHandlerTest.java | 11 --
.../scaling/core/api/JobSchedulerCenter.java | 79 +++++++++
.../RegistryRepositoryAPI.java} | 44 +++--
.../core/api/RegistryRepositoryAPIImpl.java | 59 +++++++
.../scaling/core/config/DumperConfiguration.java | 4 +-
.../scaling/core/config/JobConfiguration.java | 2 +
.../execute/executor/AbstractScalingExecutor.java | 5 -
.../executor/dumper/AbstractJDBCDumper.java | 2 +-
.../scaling/core/job/ScalingJob.java | 6 +-
.../scaling/core/job/position/JobPosition.java | 166 +++++++++++++++++++
...sitionManager.java => PositionInitializer.java} | 29 ++--
...actory.java => PositionInitializerFactory.java} | 32 ++--
.../resume/AbstractResumeBreakPointManager.java | 157 ------------------
.../resume/FileSystemResumeBreakPointManager.java | 51 ------
.../RegistryRepositoryResumeBreakPointManager.java | 43 -----
.../position/resume/ResumeBreakPointManager.java | 75 ---------
.../resume/ResumeBreakPointManagerFactory.java | 55 -------
.../core/job/preparer/ScalingJobPreparer.java | 50 +++---
.../checker/AbstractDataSourceChecker.java | 2 +-
.../preparer/resumer/ScalingPositionResumer.java | 116 -------------
.../preparer/splitter/InventoryTaskSplitter.java | 87 ++++++----
.../scaling/core/job/task/ScalingTask.java | 8 +-
.../core/job/task/incremental/IncrementalTask.java | 20 ++-
.../core/job/task/inventory/InventoryTask.java | 18 +-
.../core/schedule/ScalingTaskScheduler.java | 1 -
.../scaling/core/spi/ScalingEntry.java | 8 +-
.../scaling/core/utils/RdbmsConfigurationUtil.java | 12 +-
.../scaling/core/utils/ReflectionUtil.java | 16 ++
.../core/utils/ScalingConfigurationUtil.java | 181 +++++++++++++++++++++
.../scaling/core/utils/ScalingTaskUtil.java | 2 +-
.../core/fixture/FixtureH2ScalingEntry.java | 6 +-
...anager.java => FixturePositionInitializer.java} | 13 +-
.../fixture/FixtureResumeBreakPointManager.java | 38 -----
.../AbstractResumeBreakPointManagerTest.java | 108 ------------
.../FileSystemResumeBreakPointManagerTest.java | 54 ------
...istryRepositoryResumeBreakPointManagerTest.java | 68 --------
.../resumer/ScalingPositionResumerTest.java | 76 ---------
.../splitter/InventoryTaskSplitterTest.java | 32 ++--
.../job/task/incremental/IncrementalTaskTest.java | 7 +-
.../core/job/task/inventory/InventoryTaskTest.java | 9 +-
.../impl/StandaloneScalingJobServiceTest.java | 11 --
.../scaling/mysql/MySQLScalingEntry.java | 8 +-
...nManager.java => MySQLPositionInitializer.java} | 36 +---
.../scaling/mysql/MySQLScalingEntryTest.java | 4 +-
...Test.java => MySQLPositionInitializerTest.java} | 25 +--
.../scaling/postgresql/PostgreSQLScalingEntry.java | 8 +-
...ger.java => PostgreSQLPositionInitializer.java} | 31 +---
.../postgresql/component/PostgreSQLWalDumper.java | 2 +-
.../postgresql/PostgreSQLScalingEntryTest.java | 4 +-
...java => PostgreSQLPositionInitializerTest.java} | 25 +--
.../component/PostgreSQLWalDumperTest.java | 4 +-
.../scaling/elasticjob/job/ScalingElasticJob.java | 3 +
55 files changed, 754 insertions(+), 1216 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index 478558b..99229ac 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -39,8 +39,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
- return FixturePositionManager.class;
+ public Class<? extends PositionInitializer> getPositionInitializer() {
+ return FixturePositionInitializer.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
similarity index 77%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
index ac69629..201d333 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixturePositionInitializer.java
@@ -18,17 +18,14 @@
package org.apache.shardingsphere.scaling.fixture;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import javax.sql.DataSource;
-public final class FixturePositionManager extends PositionManager {
+public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
- public FixturePositionManager(final DataSource dataSource) {
- super(new PlaceholderPosition());
- }
-
- public FixturePositionManager(final String position) {
- super(new PlaceholderPosition());
+ @Override
+ public PlaceholderPosition init(final DataSource dataSource) {
+ return new PlaceholderPosition();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java
deleted file mode 100644
index 2233312..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureResumeBreakPointManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.fixture;
-
-import org.apache.shardingsphere.scaling.core.job.position.resume.AbstractResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-
-public final class FixtureResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-
- public FixtureResumeBreakPointManager(final String databaseType, final String taskPath) {
- super(databaseType, taskPath);
- }
-
- @Override
- public String getPosition(final String path) {
- return null;
- }
-
- @Override
- public void persistPosition(final String path, final String data) {
-
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 60abb85..4f60b9e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -32,12 +32,8 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
-import org.apache.shardingsphere.scaling.core.job.position.resume.FileSystemResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.apache.shardingsphere.scaling.fixture.FixtureResumeBreakPointManager;
import org.apache.shardingsphere.scaling.util.ScalingConfigurationUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -67,7 +63,6 @@ public final class HttpServerHandlerTest {
public void setUp() {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(TaskExecuteEngine.class));
- ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FixtureResumeBreakPointManager.class);
httpServerHandler = new HttpServerHandler();
}
@@ -190,10 +185,4 @@ public final class HttpServerHandlerTest {
JsonObject jsonObject = new Gson().fromJson(fullHttpResponse.content().toString(CharsetUtil.UTF_8), JsonObject.class);
return jsonObject.get("model").getAsJsonObject().get("jobId").getAsLong();
}
-
- @After
- @SneakyThrows(ReflectiveOperationException.class)
- public void tearDown() {
- ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FileSystemResumeBreakPointManager.class);
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java
new file mode 100644
index 0000000..9db9feb
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/JobSchedulerCenter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api;
+
+import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Job scheduler center.
+ */
+@Slf4j
+public final class JobSchedulerCenter {
+
+ private static final Map<String, ScalingJob> SCALING_JOB_MAP = Maps.newConcurrentMap();
+
+ private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
+
+ private static final RegistryRepositoryAPI REGISTRY_REPOSITORY_API = new RegistryRepositoryAPIImpl();
+
+ static {
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 1, 1, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Add job.
+ *
+ * @param scalingJob scheduler job
+ */
+ public static void addJob(final ScalingJob scalingJob) {
+ SCALING_JOB_MAP.put(String.format("%d-%d", scalingJob.getJobId(), scalingJob.getShardingItem()), scalingJob);
+ }
+
+ /**
+ * Remove job.
+ *
+ * @param scalingJob scheduler job
+ */
+ public static void removeJob(final ScalingJob scalingJob) {
+ SCALING_JOB_MAP.remove(String.format("%d-%d", scalingJob.getJobId(), scalingJob.getShardingItem()));
+ }
+
+ private static final class PersistJobContextRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ for (Map.Entry<String, ScalingJob> entry : SCALING_JOB_MAP.entrySet()) {
+ try {
+ REGISTRY_REPOSITORY_API.persistJobPosition(entry.getValue());
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("persist job {} context failed.", entry.getKey(), ex);
+ }
+ }
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
similarity index 57%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index e685d50..5227060 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -15,31 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.api;
-import lombok.Getter;
-import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
/**
- * Job configuration.
+ * Registry repository API.
*/
-@Setter
-@Getter
-public final class JobConfiguration {
-
- private Long jobId;
-
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- private String[] shardingTables;
-
- private Integer shardingItem;
-
- private int shardingSize = 1000 * 10000;
-
- private boolean running = true;
-
- private WorkflowConfiguration workflowConfig;
+public interface RegistryRepositoryAPI {
+
+ /**
+ * persist job position.
+ *
+ * @param scalingJob scaling job
+ */
+ void persistJobPosition(ScalingJob scalingJob);
+
+ /**
+ * Get job position.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job position
+ */
+ JobPosition getJobPosition(long jobId, int shardingItem);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
new file mode 100644
index 0000000..54c3ce9
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPIImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.api;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+
+import java.util.stream.Collectors;
+
+/**
+ * Registry repository API impl.
+ */
+@Slf4j
+public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
+
+ private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
+
+ @Override
+ public void persistJobPosition(final ScalingJob scalingJob) {
+ JobPosition jobPosition = new JobPosition();
+ jobPosition.setStatus(scalingJob.getStatus());
+ jobPosition.setDatabaseType(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType());
+ jobPosition.setIncrementalPositions(scalingJob.getIncrementalTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
+ jobPosition.setInventoryPositions(scalingJob.getInventoryTasks().stream().collect(Collectors.toMap(ScalingTask::getTaskId, ScalingTask::getPosition)));
+ REGISTRY_REPOSITORY.persist(ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), scalingJob.getShardingItem()), jobPosition.toJson());
+ }
+
+ @Override
+ public JobPosition getJobPosition(final long jobId, final int shardingItem) {
+ String data = null;
+ try {
+ data = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, shardingItem));
+ } catch (final NullPointerException ex) {
+ log.info("job {}-{} without break point.", jobId, shardingItem);
+ }
+ return Strings.isNullOrEmpty(data) ? null : JobPosition.fromJson(data);
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
index d6ed7fa..77ad701 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import java.util.Map;
@@ -35,7 +35,7 @@ public class DumperConfiguration {
private ScalingDataSourceConfiguration dataSourceConfig;
- private PositionManager positionManager;
+ private Position<?> position;
private Map<String, String> tableNameMap;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index e685d50..82057b5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -41,5 +41,7 @@ public final class JobConfiguration {
private boolean running = true;
+ private String databaseType;
+
private WorkflowConfiguration workflowConfig;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
index 5b6325f..55756a1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractScalingExecutor.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
/**
* Abstract scaling executor.
@@ -33,10 +32,6 @@ public abstract class AbstractScalingExecutor implements ScalingExecutor {
@Getter(AccessLevel.PROTECTED)
private boolean running;
- private String taskId;
-
- private PositionManager positionManager;
-
@Override
public void start() {
running = true;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index b2d9ff1..61a8648 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -111,7 +111,7 @@ public abstract class AbstractJDBCDumper extends AbstractScalingExecutor impleme
if (null == inventoryDumperConfig.getPrimaryKey()) {
return new PlaceholderPosition();
}
- return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfig.getPositionManager().getPosition()).getEndValue());
+ return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfig.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfig.getPosition()).getEndValue());
}
protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
index ce5656ec..ab65d9a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ScalingJob.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
+import org.apache.shardingsphere.scaling.core.job.position.JobPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
@@ -46,6 +46,8 @@ public final class ScalingJob {
private String databaseType;
+ private JobPosition initPosition;
+
private final transient List<TaskConfiguration> taskConfigs = new LinkedList<>();
private final transient List<ScalingTask> inventoryTasks = new LinkedList<>();
@@ -56,8 +58,6 @@ public final class ScalingJob {
private String status = JobStatus.RUNNING.name();
- private transient ResumeBreakPointManager resumeBreakPointManager;
-
public ScalingJob() {
this(generateKey());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java
new file mode 100644
index 0000000..baeb112
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/JobPosition.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position;
+
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Position group.
+ */
+@Getter
+@Setter
+public final class JobPosition {
+
+ private static final Gson GSON = new Gson();
+
+ private static final Gson INVENTORY_POSITION_ADAPTED_GSON = new GsonBuilder().registerTypeHierarchyAdapter(Position.class, new InventoryPositionTypeAdapter()).create();
+
+ private String status;
+
+ private String databaseType;
+
+ private Map<String, Position<?>> inventoryPositions;
+
+ private Map<String, Position<?>> incrementalPositions;
+
+ /**
+ * Get incremental position.
+ *
+ * @param dataSourceName data source name
+ * @return incremental position
+ */
+ public Position<?> getIncrementalPosition(final String dataSourceName) {
+ return incrementalPositions.get(dataSourceName);
+ }
+
+ /**
+ * Get inventory position.
+ *
+ * @param tableName table name
+ * @return inventory position
+ */
+ public Map<String, Position<?>> getInventoryPosition(final String tableName) {
+ Pattern pattern = Pattern.compile(String.format("%s(#\\d+)?", tableName));
+ return inventoryPositions.entrySet().stream()
+ .filter(entry -> pattern.matcher(entry.getKey()).find())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * To json.
+ *
+ * @return json data
+ */
+ public String toJson() {
+ JsonObject result = new JsonObject();
+ result.addProperty("status", status);
+ result.addProperty("databaseType", databaseType);
+ result.add("inventory", getInventoryJson());
+ result.add("incremental", getIncrementalJson());
+ return result.toString();
+ }
+
+ private JsonObject getInventoryJson() {
+ JsonObject result = new JsonObject();
+ JsonArray finished = new JsonArray();
+ JsonObject unfinished = new JsonObject();
+ for (Map.Entry<String, Position<?>> entry : inventoryPositions.entrySet()) {
+ if (entry.getValue() instanceof FinishedPosition) {
+ finished.add(entry.getKey());
+ continue;
+ }
+ unfinished.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getValue().getClass()));
+ }
+ result.add("finished", finished);
+ result.add("unfinished", unfinished);
+ return result;
+ }
+
+ private JsonObject getIncrementalJson() {
+ JsonObject result = new JsonObject();
+ for (Map.Entry<String, Position<?>> entry : incrementalPositions.entrySet()) {
+ result.add(entry.getKey(), GSON.toJsonTree(entry.getValue(), entry.getClass()));
+ }
+ return result;
+ }
+
+ /**
+ * From json.
+ *
+ * @param data json data
+ * @return job position
+ */
+ public static JobPosition fromJson(final String data) {
+ JobPosition result = new JobPosition();
+ JsonObject jsonObject = GSON.fromJson(data, JsonObject.class);
+ result.setStatus(jsonObject.get("status").getAsString());
+ result.setDatabaseType(jsonObject.get("databaseType").getAsString());
+ result.setInventoryPositions(getInventoryPositions(jsonObject.get("inventory").getAsJsonObject()));
+ result.setIncrementalPositions(getIncrementalPositions(jsonObject.get("incremental").getAsJsonObject(), jsonObject.get("databaseType").getAsString()));
+ return result;
+ }
+
+ private static Map<String, Position<?>> getInventoryPositions(final JsonObject inventory) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.add("inventory", inventory);
+ return INVENTORY_POSITION_ADAPTED_GSON.fromJson(jsonObject, JobPosition.class).getInventoryPositions();
+ }
+
+ private static Map<String, Position<?>> getIncrementalPositions(final JsonObject incremental, final String databaseType) {
+ Class<?> incrementalPositionClass = PositionInitializerFactory.getPositionClass(databaseType);
+ Map<String, Position<?>> result = Maps.newHashMap();
+ for (String each : incremental.keySet()) {
+ result.put(each, (Position<?>) GSON.fromJson(incremental.get(each), incrementalPositionClass));
+ }
+ return result;
+ }
+
+ private static class InventoryPositionTypeAdapter extends TypeAdapter<Position<?>> {
+
+ @Override
+ public void write(final JsonWriter out, final Position<?> value) throws IOException {
+ if (value instanceof PrimaryKeyPosition) {
+ new PrimaryKeyPosition.PositionTypeAdapter().write(out, (PrimaryKeyPosition) value);
+ } else if (value instanceof PlaceholderPosition) {
+ new PlaceholderPosition.PositionTypeAdapter().write(out, (PlaceholderPosition) value);
+ }
+ }
+
+ @Override
+ public Position<?> read(final JsonReader in) throws IOException {
+ in.beginArray();
+ Position<?> result = in.hasNext() ? new PrimaryKeyPosition(in.nextLong(), in.nextLong()) : new PlaceholderPosition();
+ in.endArray();
+ return result;
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
similarity index 69%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
index 3f2597c..4727d32 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializer.java
@@ -17,27 +17,20 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import lombok.Getter;
-import lombok.Setter;
-
import javax.sql.DataSource;
+import java.sql.SQLException;
/**
- * Scaling position manager.
+ * Position initializer.
*/
-@Getter
-@Setter
-public class PositionManager {
-
- private DataSource dataSource;
-
- private Position<?> position;
-
- public PositionManager(final DataSource dataSource) {
- this.dataSource = dataSource;
- }
+public interface PositionInitializer<T extends Position<?>> {
- public PositionManager(final Position<?> position) {
- this.position = position;
- }
+ /**
+ * Init position by data source.
+ *
+ * @param dataSource data source
+ * @return position
+ * @throws SQLException SQL exception
+ */
+ T init(DataSource dataSource) throws SQLException;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
similarity index 50%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
index fe0ed84..33717c3 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
@@ -17,43 +17,33 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import javax.sql.DataSource;
+import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
/**
- * Position manager factory.
+ * Position initializer factory.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PositionManagerFactory {
+public final class PositionInitializerFactory {
/**
- * New instance of position manager.
+ * New instance of position initializer.
*
* @param databaseType database type
- * @param dataSource data source
- * @return position manager
+ * @return position initializer
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
- ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+ public static PositionInitializer<?> newInstance(final String databaseType) {
+ return ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType).getPositionInitializer().newInstance();
}
/**
- * New instance of position manager.
+ * Get position type.
*
* @param databaseType database type
- * @param position position
- * @return position manager
+ * @return position type
*/
- @SneakyThrows(ReflectiveOperationException.class)
- public static PositionManager newInstance(final String databaseType, final String position) {
- ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
+ public static Class<?> getPositionClass(final String databaseType) {
+ return ReflectionUtil.getInterfaceGenericClass(ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType).getPositionInitializer());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
deleted file mode 100644
index 8b71ecb..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-
-import java.io.Closeable;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * Abstract resume from break-point manager.
- */
-@Slf4j
-public abstract class AbstractResumeBreakPointManager implements ResumeBreakPointManager, Closeable {
-
- private static final Gson GSON = new Gson();
-
- @Getter
- private final Map<String, PositionManager> inventoryPositionManagerMap = Maps.newConcurrentMap();
-
- @Getter
- private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
-
- @Getter
- private boolean resumable;
-
- private final String databaseType;
-
- private final String taskPath;
-
- private final ScheduledExecutorService executor;
-
- public AbstractResumeBreakPointManager(final String databaseType, final String taskPath) {
- this.databaseType = databaseType;
- this.taskPath = taskPath;
- resumePosition();
- executor = Executors.newSingleThreadScheduledExecutor();
- executor.scheduleWithFixedDelay(this::persistPosition, 1, 1, TimeUnit.MINUTES);
- }
-
- private void resumePosition() {
- try {
- resumeInventoryPosition(getInventoryPath());
- resumeIncrementalPosition(getIncrementalPath());
- resumable = !inventoryPositionManagerMap.isEmpty() && !incrementalPositionManagerMap.isEmpty();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("resume position failed.");
- throw ex;
- }
- }
-
- protected void resumeInventoryPosition(final String path) {
- String data = getPosition(path);
- if (Strings.isNullOrEmpty(data)) {
- return;
- }
- log.info("resume inventory position from {} = {}", taskPath, data);
- InventoryPositionGroup inventoryPositionGroup = InventoryPositionGroup.fromJson(data);
- Map<String, Position<?>> unfinished = inventoryPositionGroup.getUnfinished();
- for (Entry<String, Position<?>> entry : unfinished.entrySet()) {
- inventoryPositionManagerMap.put(entry.getKey(), new PositionManager(entry.getValue()));
- }
- for (String each : inventoryPositionGroup.getFinished()) {
- inventoryPositionManagerMap.put(each, new PositionManager(new FinishedPosition()));
- }
- }
-
- protected void resumeIncrementalPosition(final String path) {
- String data = getPosition(path);
- if (Strings.isNullOrEmpty(data)) {
- return;
- }
- log.info("resume incremental position from {} = {}", taskPath, data);
- Map<String, Object> incrementalPosition = GSON.<Map<String, Object>>fromJson(data, Map.class);
- for (Entry<String, Object> entry : incrementalPosition.entrySet()) {
- incrementalPositionManagerMap.put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
- }
- }
-
- @Override
- public void persistPosition() {
- try {
- persistIncrementalPosition();
- persistInventoryPosition();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("persist position failed.", ex);
- }
- }
-
- private void persistInventoryPosition() {
- InventoryPositionGroup inventoryPositionGroup = new InventoryPositionGroup();
- for (Entry<String, PositionManager> entry : inventoryPositionManagerMap.entrySet()) {
- if (entry.getValue().getPosition() instanceof FinishedPosition) {
- inventoryPositionGroup.getFinished().add(entry.getKey());
- continue;
- }
- inventoryPositionGroup.getUnfinished().put(entry.getKey(), entry.getValue().getPosition());
- }
- String data = inventoryPositionGroup.toJson();
- log.info("persist inventory position {} = {}", getInventoryPath(), data);
- persistPosition(getInventoryPath(), data);
- }
-
- private void persistIncrementalPosition() {
- String data = GSON.toJson(incrementalPositionManagerMap.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getPosition())));
- log.info("persist incremental position {} = {}", getIncrementalPath(), data);
- persistPosition(getIncrementalPath(), data);
- }
-
- protected String getInventoryPath() {
- return String.format("%s/%s", taskPath, ScalingConstant.INVENTORY);
- }
-
- protected String getIncrementalPath() {
- return String.format("%s/%s", taskPath, ScalingConstant.INCREMENTAL);
- }
-
- @Override
- public void close() {
- executor.submit((Runnable) this::persistPosition);
- executor.shutdown();
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
deleted file mode 100644
index 284cbe2..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * File system resume from break-point manager as default.
- */
-public final class FileSystemResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-
- public FileSystemResumeBreakPointManager(final String databaseType, final String taskPath) {
- super(databaseType, taskPath.startsWith("/") ? ".scaling" + taskPath : taskPath);
- }
-
- @Override
- @SneakyThrows(IOException.class)
- public String getPosition(final String path) {
- File file = new File(path);
- if (!file.exists()) {
- return null;
- }
- return FileUtils.readFileToString(file, StandardCharsets.UTF_8);
- }
-
- @Override
- @SneakyThrows(IOException.class)
- public void persistPosition(final String path, final String data) {
- FileUtils.writeStringToFile(new File(path), data, StandardCharsets.UTF_8);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java
deleted file mode 100644
index 91fedb6..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManager.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-
-/**
- * Registry repository resume from break-point manager.
- */
-public final class RegistryRepositoryResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-
- private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
-
- public RegistryRepositoryResumeBreakPointManager(final String databaseType, final String taskPath) {
- super(databaseType, taskPath);
- }
-
- @Override
- public String getPosition(final String path) {
- return REGISTRY_REPOSITORY.get(path);
- }
-
- @Override
- public void persistPosition(final String path, final String data) {
- REGISTRY_REPOSITORY.persist(path, data);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
deleted file mode 100644
index d52f78e..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-
-import java.util.Map;
-
-/**
- * Resume from break-point manager interface.
- */
-public interface ResumeBreakPointManager {
-
- /**
- * If has resumable data.
- *
- * @return is resumable
- */
- boolean isResumable();
-
- /**
- * Get inventory position map.
- *
- * @return inventory position map
- */
- Map<String, PositionManager> getInventoryPositionManagerMap();
-
- /**
- * Get incremental position map.
- *
- * @return incremental position map
- */
- Map<String, PositionManager> getIncrementalPositionManagerMap();
-
- /**
- * Get position.
- *
- * @param path path
- * @return data
- */
- String getPosition(String path);
-
- /**
- * Persist position immediately.
- */
- void persistPosition();
-
- /**
- * Persist position.
- *
- * @param path path
- * @param data data
- */
- void persistPosition(String path, String data);
-
- /**
- * Close this manager.
- */
- void close();
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
deleted file mode 100644
index efcd723..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManagerFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
-import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-
-/**
- * Resume from break-point manager factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ResumeBreakPointManagerFactory {
-
- private static Class<? extends ResumeBreakPointManager> clazz = FileSystemResumeBreakPointManager.class;
-
- static {
- ShardingSphereServiceLoader.register(RegistryRepository.class);
- ShardingSphereServiceLoader.register(ConfigurationRepository.class);
- if (RegistryRepositoryHolder.isAvailable()) {
- clazz = RegistryRepositoryResumeBreakPointManager.class;
- }
- }
-
- /**
- * New resume from break-point manager instance.
- *
- * @param databaseType database type
- * @param taskPath task path for persist data.
- * @return resume from break-point manager
- */
- @SneakyThrows(ReflectiveOperationException.class)
- public static ResumeBreakPointManager newInstance(final String databaseType, final String taskPath) {
- return clazz.getConstructor(String.class, String.class).newInstance(databaseType, taskPath);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
index f8a369f..2013895 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ScalingJobPreparer.java
@@ -19,24 +19,21 @@ package org.apache.shardingsphere.scaling.core.job.preparer;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializerFactory;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceCheckerFactory;
-import org.apache.shardingsphere.scaling.core.job.preparer.resumer.ScalingPositionResumer;
import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryTaskSplitter;
import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.utils.ScalingConfigurationUtil;
+import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
@@ -50,35 +47,26 @@ public final class ScalingJobPreparer {
private final InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter();
- private final ScalingPositionResumer scalingPositionResumer = new ScalingPositionResumer();
-
/**
* Do prepare work for scaling job.
*
* @param scalingJob scaling job
*/
public void prepare(final ScalingJob scalingJob) {
+ ScalingConfigurationUtil.fillInProperties(scalingJob.getScalingConfig());
try (DataSourceManager dataSourceManager = new DataSourceManager(scalingJob.getTaskConfigs())) {
- checkSourceDataSources(scalingJob, dataSourceManager);
- ResumeBreakPointManager resumeBreakPointManager = getResumeBreakPointManager(scalingJob);
- scalingJob.setResumeBreakPointManager(resumeBreakPointManager);
- if (resumeBreakPointManager.isResumable()) {
- scalingPositionResumer.resumePosition(scalingJob, dataSourceManager, resumeBreakPointManager);
- } else {
- checkTargetDataSources(scalingJob, dataSourceManager);
- initIncrementalTasks(scalingJob, dataSourceManager);
- initInventoryTasks(scalingJob, dataSourceManager);
- scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
- }
- } catch (final PrepareFailedException ex) {
+ checkDataSource(scalingJob, dataSourceManager);
+ initIncrementalTasks(scalingJob, dataSourceManager);
+ initInventoryTasks(scalingJob, dataSourceManager);
+ } catch (final PrepareFailedException | SQLException ex) {
log.error("Preparing scaling job {} failed", scalingJob.getJobId(), ex);
scalingJob.setStatus(JobStatus.PREPARING_FAILURE.name());
}
}
- private ResumeBreakPointManager getResumeBreakPointManager(final ScalingJob scalingJob) {
- return ResumeBreakPointManagerFactory.newInstance(scalingJob.getDatabaseType(),
- ScalingTaskUtil.getScalingListenerPath(scalingJob.getJobId(), ScalingConstant.POSITION, scalingJob.getShardingItem()));
+ private void checkDataSource(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+ checkSourceDataSources(scalingJob, dataSourceManager);
+ checkTargetDataSources(scalingJob, dataSourceManager);
}
private void checkSourceDataSources(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
@@ -96,16 +84,22 @@ public final class ScalingJobPreparer {
private void initInventoryTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
List<ScalingTask> allInventoryTasks = new LinkedList<>();
for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
- allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob.getDatabaseType(), each, dataSourceManager));
+ allInventoryTasks.addAll(inventoryTaskSplitter.splitInventoryData(scalingJob, each, dataSourceManager));
}
scalingJob.getInventoryTasks().addAll(allInventoryTasks);
}
- private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) {
+ private void initIncrementalTasks(final ScalingJob scalingJob, final DataSourceManager dataSourceManager) throws SQLException {
for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
- ScalingDataSourceConfiguration dataSourceConfig = each.getDumperConfig().getDataSourceConfig();
- each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(scalingJob.getDatabaseType(), dataSourceManager.getDataSource(dataSourceConfig)));
+ each.getDumperConfig().setPosition(getIncrementalPosition(scalingJob, each, dataSourceManager));
scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
}
}
+
+ private Position<?> getIncrementalPosition(final ScalingJob scalingJob, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) throws SQLException {
+ if (null != scalingJob.getInitPosition()) {
+ return scalingJob.getInitPosition().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
+ }
+ return PositionInitializerFactory.newInstance(taskConfig.getJobConfig().getDatabaseType()).init(dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()));
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
index fb59dce..2523a1d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/checker/AbstractDataSourceChecker.java
@@ -58,7 +58,7 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
try (PreparedStatement preparedStatement = dataSource.getConnection().prepareStatement(getSqlBuilder().buildCheckEmptySQL(each));
ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
- throw new PrepareFailedException(String.format("Target table [%s] not empty!", each));
+ throw new PrepareFailedException(String.format("Target table [%s] is not empty!", each));
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java
deleted file mode 100644
index ef9cb5a..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
-
-import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTaskFactory;
-import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-/**
- * Scaling position resumer.
- */
-public final class ScalingPositionResumer {
-
- private final ScalingTaskFactory scalingTaskFactory = new DefaultScalingTaskFactory();
-
- /**
- * Resume position from resume from break-point manager.
- *
- * @param scalingJob scaling job
- * @param dataSourceManager dataSource manager
- * @param resumeBreakPointManager resume from break-point manager
- */
- public void resumePosition(final ScalingJob scalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
- resumeInventoryPosition(scalingJob, dataSourceManager, resumeBreakPointManager);
- resumeIncrementalPosition(scalingJob, resumeBreakPointManager);
- }
-
- private void resumeInventoryPosition(final ScalingJob scalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
- scalingJob.getInventoryTasks().addAll(getAllInventoryTasks(scalingJob, dataSourceManager, resumeBreakPointManager));
- }
-
- private List<ScalingTask> getAllInventoryTasks(final ScalingJob scalingJob,
- final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
- List<ScalingTask> result = new LinkedList<>();
- for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
- MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfig().getDataSourceConfig()));
- for (Entry<String, PositionManager> entry : getInventoryPositionMap(each.getDumperConfig(), resumeBreakPointManager).entrySet()) {
- result.add(scalingTaskFactory.createInventoryTask(newInventoryDumperConfig(each.getDumperConfig(), metaDataManager, entry), each.getImporterConfig()));
- }
- }
- return result;
- }
-
- private InventoryDumperConfiguration newInventoryDumperConfig(final DumperConfiguration dumperConfig, final MetaDataManager metaDataManager, final Entry<String, PositionManager> entry) {
- String[] splitTable = entry.getKey().split("#");
- InventoryDumperConfiguration result = new InventoryDumperConfiguration(dumperConfig);
- result.setTableName(splitTable[0].split("\\.")[1]);
- result.setPositionManager(entry.getValue());
- if (2 == splitTable.length) {
- result.setShardingItem(Integer.parseInt(splitTable[1]));
- }
- result.setPrimaryKey(metaDataManager.getTableMetaData(result.getTableName()).getPrimaryKeyColumns().get(0));
- return result;
- }
-
- private Map<String, PositionManager> getInventoryPositionMap(final DumperConfiguration dumperConfig, final ResumeBreakPointManager resumeBreakPointManager) {
- Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfig.getDataSourceName()));
- return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
- .filter(entry -> pattern.matcher(entry.getKey()).find())
- .collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
- }
-
- private void resumeIncrementalPosition(final ScalingJob scalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
- for (TaskConfiguration each : scalingJob.getTaskConfigs()) {
- each.getDumperConfig().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfig().getDataSourceName()));
- scalingJob.getIncrementalTasks().add(scalingTaskFactory.createIncrementalTask(each.getJobConfig().getConcurrency(), each.getDumperConfig(), each.getImporterConfig()));
- }
- }
-
- /**
- * Persist position.
- *
- * @param scalingJob scaling job
- * @param resumeBreakPointManager resume from break-point manager
- */
- public void persistPosition(final ScalingJob scalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
- for (ScalingTask each : scalingJob.getInventoryTasks()) {
- resumeBreakPointManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
- }
- for (ScalingTask each : scalingJob.getIncrementalTasks()) {
- resumeBreakPointManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
- }
- resumeBreakPointManager.persistPosition();
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
index c4df3f9..a948dd5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitter.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
@@ -24,10 +25,10 @@ import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguratio
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
@@ -40,6 +41,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -55,30 +57,26 @@ public final class InventoryTaskSplitter {
/**
* Split inventory data to multi-tasks.
*
- * @param databaseType database type
+ * @param scalingJob scaling job
* @param taskConfig task configuration
* @param dataSourceManager data source manager
* @return split inventory data task
*/
- public Collection<ScalingTask> splitInventoryData(final String databaseType, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) {
+ public Collection<ScalingTask> splitInventoryData(final ScalingJob scalingJob, final TaskConfiguration taskConfig, final DataSourceManager dataSourceManager) {
Collection<ScalingTask> result = new LinkedList<>();
- for (InventoryDumperConfiguration each : splitDumperConfig(databaseType, taskConfig.getJobConfig().getShardingSize(), taskConfig.getDumperConfig(), dataSourceManager)) {
+ for (InventoryDumperConfiguration each : splitDumperConfig(scalingJob, taskConfig.getDumperConfig(), dataSourceManager)) {
result.add(scalingTaskFactory.createInventoryTask(each, taskConfig.getImporterConfig()));
}
return result;
}
private Collection<InventoryDumperConfiguration> splitDumperConfig(
- final String databaseType, final int shardingSize, final DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
+ final ScalingJob scalingJob, final DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
MetaDataManager metaDataManager = new MetaDataManager(dataSource);
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
- if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
- result.addAll(splitByPrimaryKeyRange(databaseType, shardingSize, each, metaDataManager, dataSource));
- } else {
- result.add(each);
- }
+ result.addAll(splitByPrimaryKey(scalingJob, dataSource, metaDataManager, each));
}
return result;
}
@@ -88,30 +86,59 @@ public final class InventoryTaskSplitter {
dumperConfig.getTableNameMap().forEach((key, value) -> {
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(dumperConfig);
inventoryDumperConfig.setTableName(key);
- inventoryDumperConfig.setPositionManager(new PositionManager(new PlaceholderPosition()));
+ inventoryDumperConfig.setPosition(new PlaceholderPosition());
result.add(inventoryDumperConfig);
});
return result;
}
- private boolean isSpiltByPrimaryKeyRange(final InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager metaDataManager) {
- TableMetaData tableMetaData = metaDataManager.getTableMetaData(inventoryDumperConfig.getTableName());
+ private Collection<InventoryDumperConfiguration> splitByPrimaryKey(
+ final ScalingJob scalingJob, final DataSource dataSource, final MetaDataManager metaDataManager, final InventoryDumperConfiguration dumperConfig) {
+ Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+ Collection<Position<?>> inventoryPositions = getInventoryPositions(scalingJob, dumperConfig, dataSource, metaDataManager);
+ int i = 0;
+ for (Position<?> inventoryPosition : inventoryPositions) {
+ InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfig);
+ splitDumperConfig.setPosition(inventoryPosition);
+ splitDumperConfig.setShardingItem(i++);
+ splitDumperConfig.setTableName(dumperConfig.getTableName());
+ splitDumperConfig.setPrimaryKey(dumperConfig.getPrimaryKey());
+ result.add(splitDumperConfig);
+ }
+ return result;
+ }
+
+ private Collection<Position<?>> getInventoryPositions(
+ final ScalingJob scalingJob, final InventoryDumperConfiguration dumperConfig, final DataSource dataSource, final MetaDataManager metaDataManager) {
+ if (null != scalingJob.getInitPosition()) {
+ return scalingJob.getInitPosition().getInventoryPosition(dumperConfig.getTableName()).values();
+ }
+ if (isSpiltByPrimaryKeyRange(metaDataManager, dumperConfig.getTableName())) {
+ String primaryKey = metaDataManager.getTableMetaData(dumperConfig.getTableName()).getPrimaryKeyColumns().get(0);
+ dumperConfig.setPrimaryKey(primaryKey);
+ return getPositionByPrimaryKeyRange(scalingJob, dataSource, dumperConfig);
+ }
+ return Lists.newArrayList(new PlaceholderPosition());
+ }
+
+ private boolean isSpiltByPrimaryKeyRange(final MetaDataManager metaDataManager, final String tableName) {
+ TableMetaData tableMetaData = metaDataManager.getTableMetaData(tableName);
if (null == tableMetaData) {
- log.warn("Can't split range for table {}, reason: can not get table metadata ", inventoryDumperConfig.getTableName());
+ log.warn("Can't split range for table {}, reason: can not get table metadata ", tableName);
return false;
}
List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
if (null == primaryKeys || primaryKeys.isEmpty()) {
- log.warn("Can't split range for table {}, reason: no primary key", inventoryDumperConfig.getTableName());
+ log.warn("Can't split range for table {}, reason: no primary key", tableName);
return false;
}
if (primaryKeys.size() > 1) {
- log.warn("Can't split range for table {}, reason: primary key is union primary", inventoryDumperConfig.getTableName());
+ log.warn("Can't split range for table {}, reason: primary key is union primary", tableName);
return false;
}
int index = tableMetaData.findColumnIndex(primaryKeys.get(0));
if (isNotIntegerPrimary(tableMetaData.getColumnMetaData(index).getDataType())) {
- log.warn("Can't split range for table {}, reason: primary key is not integer number", inventoryDumperConfig.getTableName());
+ log.warn("Can't split range for table {}, reason: primary key is not integer number", tableName);
return false;
}
return true;
@@ -121,36 +148,28 @@ public final class InventoryTaskSplitter {
return Types.INTEGER != columnType && Types.BIGINT != columnType && Types.SMALLINT != columnType && Types.TINYINT != columnType;
}
- private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(
- final String databaseType, final int shardingSize, final InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager metaDataManager, final DataSource dataSource) {
- Collection<InventoryDumperConfiguration> result = new LinkedList<>();
- String tableName = inventoryDumperConfig.getTableName();
- String primaryKey = metaDataManager.getTableMetaData(tableName).getPrimaryKeyColumns().get(0);
- ScalingSQLBuilder scalingSqlBuilder = ScalingSQLBuilderFactory.newInstance(databaseType);
- inventoryDumperConfig.setPrimaryKey(primaryKey);
+ private Collection<Position<?>> getPositionByPrimaryKeyRange(final ScalingJob scalingJob, final DataSource dataSource, final InventoryDumperConfiguration dumperConfig) {
+ Collection<Position<?>> result = new ArrayList<>();
+ String sql = ScalingSQLBuilderFactory.newInstance(scalingJob.getScalingConfig().getJobConfiguration().getDatabaseType())
+ .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(), dumperConfig.getPrimaryKey());
try (Connection connection = dataSource.getConnection();
- PreparedStatement ps = connection.prepareStatement(scalingSqlBuilder.buildSplitByPrimaryKeyRangeSQL(tableName, primaryKey))) {
+ PreparedStatement ps = connection.prepareStatement(sql)) {
long beginId = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ps.setLong(1, beginId);
- ps.setLong(2, shardingSize);
+ ps.setLong(2, scalingJob.getScalingConfig().getJobConfiguration().getShardingSize());
try (ResultSet rs = ps.executeQuery()) {
rs.next();
long endId = rs.getLong(1);
if (endId == 0) {
break;
}
- InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(inventoryDumperConfig);
- splitDumperConfig.setPositionManager(new PositionManager(new PrimaryKeyPosition(beginId, endId)));
- splitDumperConfig.setShardingItem(i);
- splitDumperConfig.setPrimaryKey(primaryKey);
- splitDumperConfig.setTableName(tableName);
- result.add(splitDumperConfig);
+ result.add(new PrimaryKeyPosition(beginId, endId));
beginId = endId + 1;
}
}
} catch (final SQLException ex) {
- throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", inventoryDumperConfig.getTableName(), primaryKey), ex);
+ throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfig.getTableName(), dumperConfig.getPrimaryKey()), ex);
}
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 68344f3..426889d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
/**
* Scaling task interface.
@@ -34,11 +34,11 @@ public interface ScalingTask extends ScalingExecutor {
TaskProgress getProgress();
/**
- * Get position manager.
+ * Get position.
*
- * @return position manager
+ * @return position
*/
- PositionManager getPositionManager();
+ Position<?> getPosition();
/**
* Get task id.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
index a5aa5de..788c0bc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTask.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.job.task.incremental;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
@@ -33,6 +34,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.ArrayList;
@@ -57,6 +59,12 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
private Dumper dumper;
+ @Getter
+ private final String taskId;
+
+ @Getter
+ private Position<?> position;
+
private long delayMillisecond = Long.MAX_VALUE;
public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig) {
@@ -64,13 +72,13 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
this.dumperConfig = dumperConfig;
this.importerConfig = importerConfig;
dataSourceManager = new DataSourceManager();
- setTaskId(dumperConfig.getDataSourceName());
- setPositionManager(dumperConfig.getPositionManager());
+ taskId = dumperConfig.getDataSourceName();
+ position = dumperConfig.getPosition();
}
@Override
public void start() {
- dumper = DumperFactory.newInstanceLogDumper(dumperConfig, getPositionManager().getPosition());
+ dumper = DumperFactory.newInstanceLogDumper(dumperConfig, position);
Collection<Importer> importers = instanceImporters();
instanceChannel(importers);
Future<?> future = ScalingContext.getInstance().getIncrementalDumperExecuteEngine().submitAll(importers, new ExecuteCallback() {
@@ -102,7 +110,7 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
if (!(lastHandledRecord.getPosition() instanceof PlaceholderPosition)) {
- getPositionManager().setPosition(lastHandledRecord.getPosition());
+ position = lastHandledRecord.getPosition();
}
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
});
@@ -117,7 +125,7 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
future.get();
} catch (final InterruptedException ignored) {
} catch (final ExecutionException ex) {
- throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), ex.getCause());
+ throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", taskId), ex.getCause());
}
}
@@ -131,6 +139,6 @@ public final class IncrementalTask extends AbstractScalingExecutor implements Sc
@Override
public TaskProgress getProgress() {
- return new IncrementalTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getPosition());
+ return new IncrementalTaskProgress(taskId, delayMillisecond, position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
index 1ecc14e..1c88d19 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTask.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
@@ -34,6 +35,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.Optional;
@@ -54,6 +56,12 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
private Dumper dumper;
+ @Getter
+ private final String taskId;
+
+ @Getter
+ private Position<?> position;
+
public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig) {
this(inventoryDumperConfig, importerConfig, new DataSourceManager());
}
@@ -62,8 +70,8 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
this.inventoryDumperConfig = inventoryDumperConfig;
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
- setTaskId(generateTaskId(inventoryDumperConfig));
- setPositionManager(inventoryDumperConfig.getPositionManager());
+ taskId = generateTaskId(inventoryDumperConfig);
+ position = inventoryDumperConfig.getPosition();
}
private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
@@ -100,7 +108,7 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
private void instanceChannel(final Importer importer) {
MemoryChannel channel = new MemoryChannel(records -> {
Optional<Record> record = records.stream().filter(each -> !(each.getPosition() instanceof PlaceholderPosition)).reduce((a, b) -> b);
- record.ifPresent(value -> getPositionManager().setPosition(value.getPosition()));
+ record.ifPresent(value -> position = value.getPosition());
});
dumper.setChannel(channel);
importer.setChannel(channel);
@@ -111,7 +119,7 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
future.get();
} catch (final InterruptedException ignored) {
} catch (final ExecutionException ex) {
- throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), ex.getCause());
+ throw new ScalingTaskExecuteException(String.format("Task %s execute failed ", taskId), ex.getCause());
}
}
@@ -125,6 +133,6 @@ public final class InventoryTask extends AbstractScalingExecutor implements Scal
@Override
public TaskProgress getProgress() {
- return new InventoryTaskProgress(getTaskId(), getPositionManager().getPosition() instanceof FinishedPosition);
+ return new InventoryTaskProgress(taskId, position instanceof FinishedPosition);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 125717d..f16b71f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,7 +62,6 @@ public final class ScalingTaskScheduler implements Runnable {
log.info("stop incremental task {} - {}", scalingJob.getJobId(), each.getTaskId());
each.stop();
}
- scalingJob.getResumeBreakPointManager().close();
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 247895c..32658fe 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
/**
@@ -46,11 +46,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
Class<? extends LogDumper> getLogDumperClass();
/**
- * Get position manager type.
+ * Get position initializer type.
*
- * @return position manager type
+ * @return position initializer type
*/
- Class<? extends PositionManager> getPositionManager();
+ Class<? extends PositionInitializer> getPositionInitializer();
/**
* Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
index 3202cd5..160f686 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
/**
@@ -36,14 +36,14 @@ public final class RdbmsConfigurationUtil {
* @return SQL where condition
*/
public static String getWhereCondition(final InventoryDumperConfiguration inventoryDumperConfig) {
- return getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPositionManager());
+ return getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPosition());
}
- private static String getWhereCondition(final String primaryKey, final PositionManager positionManager) {
- if (null == primaryKey || null == positionManager) {
+ private static String getWhereCondition(final String primaryKey, final Position<?> position) {
+ if (null == primaryKey || null == position) {
return "";
}
- PrimaryKeyPosition position = (PrimaryKeyPosition) positionManager.getPosition();
- return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
+ PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
+ return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
index 078f0a9..5c9b954 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
@@ -17,9 +17,13 @@
package org.apache.shardingsphere.scaling.core.utils;
+import com.google.common.base.Preconditions;
+
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
@@ -153,4 +157,16 @@ public final class ReflectionUtil {
method.setAccessible(true);
method.invoke(target, parameterValues);
}
+
+ /**
+ * Get interface generic class.
+ *
+ * @param clazz class
+ * @return generic class
+ */
+ public static Class<?> getInterfaceGenericClass(final Class<?> clazz) {
+ Type[] types = clazz.getGenericInterfaces();
+ Preconditions.checkState(types.length == 1, "Only supported one generic type");
+ return (Class<?>) ((ParameterizedType) types[0]).getActualTypeArguments()[0];
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
new file mode 100644
index 0000000..2eff56d
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingConfigurationUtil.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Scaling configuration util.
+ */
+public final class ScalingConfigurationUtil {
+
+ private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR = initIdAutoIncreaseGenerator();
+
+ private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
+ SnowflakeKeyGenerateAlgorithm result = new SnowflakeKeyGenerateAlgorithm();
+ result.init();
+ return result;
+ }
+
+ private static Long generateKey() {
+ return (Long) ID_AUTO_INCREASE_GENERATOR.generateKey();
+ }
+
+ /**
+ * Fill in properties for scaling config.
+ *
+ * @param scalingConfig scaling config
+ */
+ public static void fillInProperties(final ScalingConfiguration scalingConfig) {
+ JobConfiguration jobConfig = scalingConfig.getJobConfiguration();
+ if (null == jobConfig.getJobId()) {
+ jobConfig.setJobId(generateKey());
+ }
+ if (Strings.isNullOrEmpty(jobConfig.getDatabaseType())) {
+ jobConfig.setDatabaseType(scalingConfig.getRuleConfiguration().getSource().unwrap().getDatabaseType().getName());
+ }
+ if (null == scalingConfig.getJobConfiguration().getShardingTables()) {
+ jobConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(scalingConfig)));
+ }
+ }
+
+ private static List<String> getShouldScalingActualDataNodes(final ScalingConfiguration scalingConfig) {
+ ScalingDataSourceConfiguration sourceConfig = scalingConfig.getRuleConfiguration().getSource().unwrap();
+ Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
+ "Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
+ if (!(scalingConfig.getRuleConfiguration().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
+ return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+ }
+ ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
+ return getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSource(), target.getDataSource()),
+ getShardingRuleConfigMap(source.getRule()), getShardingRuleConfigMap(target.getRule()));
+ }
+
+ private static List<String> getShouldScalingActualDataNodes(final Set<String> modifiedDataSources,
+ final Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
+ final Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
+ List<String> result = new ArrayList<>();
+ newShardingRuleConfigMap.keySet().forEach(each -> {
+ if (!oldShardingRuleConfigMap.containsKey(each)) {
+ return;
+ }
+ List<String> oldActualDataNodes = new InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+ List<String> newActualDataNodes = new InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+ if (!CollectionUtils.isEqualCollection(oldActualDataNodes, newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, modifiedDataSources)) {
+ result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
+ }
+ });
+ return result;
+ }
+
+ private static Set<String> getModifiedDataSources(final String oldConfig, final String newConfig) {
+ Set<String> result = new HashSet<>();
+ Map<String, String> oldDataSourceUrlMap = getDataSourceUrlMap(oldConfig);
+ Map<String, String> newDataSourceUrlMap = getDataSourceUrlMap(newConfig);
+ newDataSourceUrlMap.forEach((key, value) -> {
+ if (!value.equals(oldDataSourceUrlMap.get(key))) {
+ result.add(key);
+ }
+ });
+ return result;
+ }
+
+ private static Map<String, String> getDataSourceUrlMap(final String configuration) {
+ Map<String, String> result = new HashMap<>();
+ ConfigurationYamlConverter.loadDataSourceConfigs(configuration).forEach((key, value) -> {
+ JdbcUri uri = new JdbcUri(value.getProps().getOrDefault("url", value.getProps().get("jdbcUrl")).toString());
+ result.put(key, String.format("%s/%s", uri.getHost(), uri.getDatabase()));
+ });
+ return result;
+ }
+
+ private static boolean includeModifiedDataSources(final List<String> actualDataNodes, final Set<String> modifiedDataSources) {
+ return actualDataNodes.stream().anyMatch(each -> modifiedDataSources.contains(each.split("\\.")[0]));
+ }
+
+ private static Map<String, ShardingTableRuleConfiguration> getShardingRuleConfigMap(final String configuration) {
+ ShardingRuleConfiguration oldShardingRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(configuration);
+ return oldShardingRuleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable, Function.identity()));
+ }
+
+ private static String[] groupByDataSource(final List<String> actualDataNodeList) {
+ List<String> result = new ArrayList<>();
+ Multimap<String, String> multiMap = getNodeMultiMap(actualDataNodeList);
+ for (String key : multiMap.keySet()) {
+ List<String> list = new ArrayList<>();
+ for (String value : multiMap.get(key)) {
+ list.add(String.format("%s.%s", key, value));
+ }
+ result.add(String.join(",", list));
+ }
+ return result.toArray(new String[0]);
+ }
+
+ private static Multimap<String, String> getNodeMultiMap(final List<String> actualDataNodeList) {
+ Multimap<String, String> result = HashMultimap.create();
+ for (String actualDataNodes : actualDataNodeList) {
+ for (String actualDataNode : actualDataNodes.split(",")) {
+ String[] nodeArray = split(actualDataNode);
+ for (String dataSource : new InlineExpressionParser(nodeArray[0]).splitAndEvaluate()) {
+ result.put(dataSource, nodeArray[1]);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static String[] split(final String actualDataNode) {
+ boolean flag = true;
+ int i = 0;
+ for (; i < actualDataNode.length(); i++) {
+ char each = actualDataNode.charAt(i);
+ if (each == '{') {
+ flag = false;
+ } else if (each == '}') {
+ flag = true;
+ } else if (flag && each == '.') {
+ break;
+ }
+ }
+ return new String[]{actualDataNode.substring(0, i), actualDataNode.substring(i + 1)};
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index b15dec6..5597050 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -38,7 +38,7 @@ public final class ScalingTaskUtil {
* @return is finished
*/
public static boolean allInventoryTasksFinished(final Collection<ScalingTask> inventoryTasks) {
- return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPositionManager().getPosition() instanceof FinishedPosition);
+ return inventoryTasks.stream().allMatch(each -> ((InventoryTask) each).getPosition() instanceof FinishedPosition);
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index f834cbc..2effedc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -39,8 +39,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
- return FixturePositionManager.class;
+ public Class<? extends PositionInitializer> getPositionInitializer() {
+ return FixturePositionInitializer.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
similarity index 77%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
index 93421b4..ac58683 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixturePositionInitializer.java
@@ -18,17 +18,14 @@
package org.apache.shardingsphere.scaling.core.fixture;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import javax.sql.DataSource;
-public final class FixturePositionManager extends PositionManager {
+public final class FixturePositionInitializer implements PositionInitializer<PlaceholderPosition> {
- public FixturePositionManager(final DataSource dataSource) {
- super(new PlaceholderPosition());
- }
-
- public FixturePositionManager(final String position) {
- super(new PlaceholderPosition());
+ @Override
+ public PlaceholderPosition init(final DataSource dataSource) {
+ return new PlaceholderPosition();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java
deleted file mode 100644
index b808073..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureResumeBreakPointManager.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.fixture;
-
-import org.apache.shardingsphere.scaling.core.job.position.resume.AbstractResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-
-public final class FixtureResumeBreakPointManager extends AbstractResumeBreakPointManager implements ResumeBreakPointManager {
-
- public FixtureResumeBreakPointManager(final String databaseType, final String taskPath) {
- super(databaseType, taskPath);
- }
-
- @Override
- public String getPosition(final String path) {
- return null;
- }
-
- @Override
- public void persistPosition(final String path, final String data) {
-
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
deleted file mode 100644
index e06bbbb..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class AbstractResumeBreakPointManagerTest {
-
- private AbstractResumeBreakPointManager resumeBreakPointManager;
-
- @Test
- public void assertResumeInventoryPosition() {
- Map<String, String> dataMap = new HashMap<>();
- dataMap.put("/base/inventory", "{\"unfinished\":{\"ds1.t_order_1#0\":[0,200],\"ds0.t_order_1#0\":[0,100],\"ds0.t_order_2\":[]},\"finished\":[\"ds0.t_order_1#1\"]}");
- resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
- resumeBreakPointManager.resumeInventoryPosition("");
- assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), is(0));
- resumeBreakPointManager.resumeInventoryPosition("/base/inventory");
- assertThat(resumeBreakPointManager.getInventoryPositionManagerMap().size(), is(4));
- }
-
- @Test
- public void assertResumeIncrementalPosition() {
- Map<String, String> dataMap = new HashMap<>();
- dataMap.put("/base/incremental", "{\"ds0\":[],\"ds1\":[]}");
- resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
- resumeBreakPointManager.resumeIncrementalPosition("");
- assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), is(0));
- resumeBreakPointManager.resumeIncrementalPosition("/base/incremental");
- assertThat(resumeBreakPointManager.getIncrementalPositionManagerMap().size(), is(2));
- }
-
- @Test
- public void assertPersistInventoryPosition() {
- Map<String, String> dataMap = new HashMap<>();
- dataMap.put("/base/inventory", "{\"unfinished\":{\"ds0.t_order_2#0\":[],\"ds0.t_order_1#0\":[0,100]},\"finished\":[\"ds0.t_order_1#1\"]}");
- dataMap.put("/base/incremental", "{}");
- resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PositionManager(new PrimaryKeyPosition(0L, 100L)));
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PositionManager(new FinishedPosition()));
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2#0", new PositionManager(new PlaceholderPosition()));
- resumeBreakPointManager.persistPosition();
- }
-
- @Test
- public void assertPersistIncrementalPosition() {
- Map<String, String> dataMap = new HashMap<>();
- dataMap.put("/base/inventory", "{\"unfinished\":{},\"finished\":[]}");
- dataMap.put("/base/incremental", "{\"ds0\":[],\"ds1\":[]}");
- resumeBreakPointManager = mockResumeBreakPointManager(dataMap);
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new PositionManager(new PlaceholderPosition()));
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new PositionManager(new PlaceholderPosition()));
- resumeBreakPointManager.persistPosition();
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private AbstractResumeBreakPointManager mockResumeBreakPointManager(final Map<String, String> dataMap) {
- AbstractResumeBreakPointManager result = new AbstractResumeBreakPointManager("H2", "/base") {
-
- @Override
- public String getPosition(final String path) {
- return dataMap.get(path);
- }
-
- @Override
- public void persistPosition(final String path, final String data) {
- assertThat(data, is(dataMap.get(path)));
- }
- };
- ReflectionUtil.setFieldValue(AbstractResumeBreakPointManager.class, result, "inventoryPositionManagerMap", new TreeMap<String, PositionManager>());
- ReflectionUtil.setFieldValue(AbstractResumeBreakPointManager.class, result, "incrementalPositionManagerMap", new TreeMap<String, PositionManager>());
- return result;
- }
-
- @After
- public void tearDown() {
- resumeBreakPointManager.close();
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java
deleted file mode 100644
index 1a29ee9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/FileSystemResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class FileSystemResumeBreakPointManagerTest {
-
- private FileSystemResumeBreakPointManager resumeBreakPointManager;
-
- @Before
- public void setUp() {
- resumeBreakPointManager = new FileSystemResumeBreakPointManager("H2", "target/.scaling");
- }
-
- @Test
- public void assertPersistAndGetPosition() {
- resumeBreakPointManager.persistPosition();
- assertThat(resumeBreakPointManager.getPosition("target/.scaling/incremental"), is("{}"));
- assertThat(resumeBreakPointManager.getPosition("target/.scaling/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
- }
-
- @After
- @SneakyThrows(IOException.class)
- public void tearDown() {
- resumeBreakPointManager.close();
- FileUtils.forceDeleteOnExit(new File("target/.scaling"));
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java
deleted file mode 100644
index a477b48..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/RegistryRepositoryResumeBreakPointManagerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.position.resume;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public final class RegistryRepositoryResumeBreakPointManagerTest {
-
- private RegistryRepositoryResumeBreakPointManager resumeBreakPointManager;
-
- @Before
- public void setUp() {
- ScalingContext.getInstance().init(mockServerConfiguration());
- resumeBreakPointManager = new RegistryRepositoryResumeBreakPointManager("H2", "/base");
- }
-
- @Test
- public void assertPersistAndGetPosition() {
- resumeBreakPointManager.persistPosition();
- assertThat(resumeBreakPointManager.getPosition("/base/incremental"), is("{}"));
- assertThat(resumeBreakPointManager.getPosition("/base/inventory"), is("{\"unfinished\":{},\"finished\":[]}"));
- }
-
- @After
- public void tearDown() {
- resumeBreakPointManager.close();
- resetRegistryRepositoryAvailable();
- }
-
- private ServerConfiguration mockServerConfiguration() {
- resetRegistryRepositoryAvailable();
- ServerConfiguration result = new ServerConfiguration();
- result.setDistributedScalingService(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("REG_FIXTURE", "", null), false));
- return result;
- }
-
- @SneakyThrows(ReflectiveOperationException.class)
- private void resetRegistryRepositoryAvailable() {
- ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
deleted file mode 100644
index a8cd2bd..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/ScalingPositionResumerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
-import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
-import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public final class ScalingPositionResumerTest {
-
- private ScalingJob scalingJob;
-
- private ScalingPositionResumer scalingPositionResumer;
-
- @Before
- public void setUp() {
- ScalingContext.getInstance().init(new ServerConfiguration());
- scalingJob = mockScalingJob();
- scalingPositionResumer = new ScalingPositionResumer();
- }
-
- @Test
- public void assertResumePosition() {
- ResumeBreakPointManager resumeBreakPointManager = ResumeBreakPointManagerFactory.newInstance("MySQL", "/scalingTest/position/0");
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PositionManager(new PrimaryKeyPosition(0, 100)));
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new PositionManager(new PlaceholderPosition()));
- scalingPositionResumer.resumePosition(scalingJob, new DataSourceManager(), resumeBreakPointManager);
- assertThat(scalingJob.getIncrementalTasks().size(), is(1));
- assertTrue(scalingJob.getInventoryTasks().isEmpty());
- }
-
- @Test
- public void assertPersistPosition() {
- ResumeBreakPointManager resumeBreakPointManager = mock(ResumeBreakPointManager.class);
- scalingPositionResumer.persistPosition(scalingJob, resumeBreakPointManager);
- verify(resumeBreakPointManager).persistPosition();
- }
-
- @SneakyThrows(IOException.class)
- private ScalingJob mockScalingJob() {
- return ScalingConfigurationUtil.initJob("/config.json");
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
index b48cc91..8d7f27f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
@@ -17,20 +17,23 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
+import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -51,7 +54,7 @@ public final class InventoryTaskSplitterTest {
private static final String PASSWORD = "password";
- private static final String DATABASE_TYPE = "H2";
+ private ScalingJob scalingJob;
private TaskConfiguration taskConfig;
@@ -61,9 +64,7 @@ public final class InventoryTaskSplitterTest {
@Before
public void setUp() {
- DumperConfiguration dumperConfig = mockDumperConfig();
- ImporterConfiguration importerConfig = new ImporterConfiguration();
- taskConfig = new TaskConfiguration(new JobConfiguration(), dumperConfig, importerConfig);
+ scalingJob = mockScalingJob();
dataSourceManager = new DataSourceManager();
inventoryTaskSplitter = new InventoryTaskSplitter();
}
@@ -77,17 +78,17 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
taskConfig.getJobConfig().setShardingSize(10);
initIntPrimaryEnvironment(taskConfig.getDumperConfig());
- List<ScalingTask> actual = (List<ScalingTask>) inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+ List<ScalingTask> actual = (List<ScalingTask>) inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(10));
- assertThat(((PrimaryKeyPosition) actual.get(9).getPositionManager().getPosition()).getBeginValue(), is(91L));
- assertThat(((PrimaryKeyPosition) actual.get(9).getPositionManager().getPosition()).getEndValue(), is(100L));
+ assertThat(((PrimaryKeyPosition) actual.get(9).getPosition()).getBeginValue(), is(91L));
+ assertThat(((PrimaryKeyPosition) actual.get(9).getPosition()).getEndValue(), is(100L));
}
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
initCharPrimaryEnvironment(taskConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+ Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -95,7 +96,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+ Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -103,7 +104,7 @@ public final class InventoryTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
- Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(DATABASE_TYPE, taskConfig, dataSourceManager);
+ Collection<ScalingTask> actual = inventoryTaskSplitter.splitInventoryData(scalingJob, taskConfig, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -150,6 +151,15 @@ public final class InventoryTaskSplitterTest {
}
}
+ @SneakyThrows(IOException.class)
+ private ScalingJob mockScalingJob() {
+ ScalingJob result = ScalingConfigurationUtil.initJob("/config.json");
+ result.getScalingConfig().getJobConfiguration().setDatabaseType("H2");
+ result.getScalingConfig().getJobConfiguration().setShardingSize(10);
+ taskConfig = new TaskConfiguration(result.getScalingConfig().getJobConfiguration(), mockDumperConfig(), new ImporterConfiguration());
+ return result;
+ }
+
private DumperConfiguration mockDumperConfig() {
ScalingDataSourceConfiguration dataSourceConfig = new StandardJDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
DumperConfiguration result = new DumperConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
index 7ca5243..93e53bd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskTest.java
@@ -19,13 +19,12 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.task.DefaultScalingTaskFactory;
import org.junit.After;
import org.junit.Before;
@@ -82,7 +81,7 @@ public final class IncrementalTaskTest {
Map<String, String> tableMap = new HashMap<>(1, 1);
tableMap.put("t_order", "t_order");
result.setTableNameMap(tableMap);
- result.setPositionManager(new PositionManager(new PlaceholderPosition()));
+ result.setPosition(new PlaceholderPosition());
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
index dd588b2..34f5fea 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskTest.java
@@ -21,14 +21,13 @@ import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.ScalingTaskExecuteException;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.junit.After;
import org.junit.Before;
@@ -81,7 +80,7 @@ public final class InventoryTaskTest {
initTableData(taskConfig.getDumperConfig());
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(taskConfig.getDumperConfig());
inventoryDumperConfig.setTableName("t_order");
- inventoryDumperConfig.setPositionManager(taskConfig.getDumperConfig().getPositionManager());
+ inventoryDumperConfig.setPosition(taskConfig.getDumperConfig().getPosition());
InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(), dataSourceManager);
inventoryTask.start();
assertFalse(((InventoryTaskProgress) inventoryTask.getProgress()).isFinished());
@@ -101,7 +100,7 @@ public final class InventoryTaskTest {
ScalingDataSourceConfiguration dataSourceConfig = new StandardJDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfig(dataSourceConfig);
- result.setPositionManager(new PositionManager(new PrimaryKeyPosition(1, 100)));
+ result.setPosition(new PrimaryKeyPosition(1, 100));
result.setTableNameMap(Collections.emptyMap());
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
index 3b9d490..dd8b98c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobServiceTest.java
@@ -25,18 +25,14 @@ import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourc
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.execute.engine.TaskExecuteEngine;
-import org.apache.shardingsphere.scaling.core.fixture.FixtureResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
-import org.apache.shardingsphere.scaling.core.job.position.resume.FileSystemResumeBreakPointManager;
-import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -75,7 +71,6 @@ public final class StandaloneScalingJobServiceTest {
public void setUp() {
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "inventoryDumperExecuteEngine", mock(TaskExecuteEngine.class));
- ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FixtureResumeBreakPointManager.class);
}
@Test
@@ -163,10 +158,4 @@ public final class StandaloneScalingJobServiceTest {
return resultSet.getLong(1);
}
}
-
- @After
- @SneakyThrows(ReflectiveOperationException.class)
- public void tearDown() {
- ReflectionUtil.setStaticFieldValue(ResumeBreakPointManagerFactory.class, "clazz", FileSystemResumeBreakPointManager.class);
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 092b4df..8eeffad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.mysql.component.MySQLBinlogDumper;
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.scaling.mysql.component.MySQLDataConsistencyChe
import org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
-import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionInitializer;
import org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
/**
@@ -49,8 +49,8 @@ public final class MySQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
- return MySQLPositionManager.class;
+ public Class<? extends PositionInitializer> getPositionInitializer() {
+ return MySQLPositionInitializer.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
similarity index 63%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
index a52defa..b4eae3e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializer.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.scaling.mysql.component;
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import javax.sql.DataSource;
@@ -30,33 +27,16 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * MySQL binlog position manager.
+ * MySQL binlog position initializer.
*/
-public final class MySQLPositionManager extends PositionManager {
-
- public MySQLPositionManager(final DataSource dataSource) {
- super(dataSource);
- initPosition();
- }
-
- public MySQLPositionManager(final String position) {
- super(new Gson().fromJson(position, BinlogPosition.class));
- }
+public final class MySQLPositionInitializer implements PositionInitializer<BinlogPosition> {
@Override
- public BinlogPosition getPosition() {
- Position<?> position = super.getPosition();
- Preconditions.checkState(null != position, "Unknown position.");
- return (BinlogPosition) position;
- }
-
- private void initPosition() {
- try (Connection connection = getDataSource().getConnection()) {
- BinlogPosition position = getBinlogPosition(connection);
- position.setServerId(getServerId(connection));
- setPosition(position);
- } catch (final SQLException ex) {
- throw new RuntimeException("init position failed.", ex);
+ public BinlogPosition init(final DataSource dataSource) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ BinlogPosition result = getBinlogPosition(connection);
+ result.setServerId(getServerId(connection));
+ return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
index a41ae62..5627109 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntryTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.scaling.mysql.component.MySQLDataConsistencyChe
import org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
-import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionInitializer;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -37,7 +37,7 @@ public final class MySQLScalingEntryTest {
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("MySQL");
assertTrue(scalingEntry instanceof MySQLScalingEntry);
- assertThat(scalingEntry.getPositionManager(), equalTo(MySQLPositionManager.class));
+ assertThat(scalingEntry.getPositionInitializer(), equalTo(MySQLPositionInitializer.class));
assertThat(scalingEntry.getDataSourceCheckerClass(), equalTo(MySQLDataSourceChecker.class));
assertThat(scalingEntry.getImporterClass(), equalTo(MySQLImporter.class));
assertThat(scalingEntry.getJdbcDumperClass(), equalTo(MySQLJdbcDumper.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
similarity index 75%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
index 6ef60ca..4c0fae9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLPositionInitializerTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.mysql.component;
-import com.google.gson.Gson;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.junit.Before;
import org.junit.Test;
@@ -37,7 +36,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class MySQLPositionManagerTest {
+public final class MySQLPositionInitializerTest {
private static final String LOG_FILE_NAME = "binlog-000001";
@@ -61,30 +60,14 @@ public final class MySQLPositionManagerTest {
}
@Test
- public void assertGetCurrentPosition() {
- MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
- BinlogPosition actual = mysqlPositionManager.getPosition();
+ public void assertGetCurrentPosition() throws SQLException {
+ MySQLPositionInitializer mySQLPositionInitializer = new MySQLPositionInitializer();
+ BinlogPosition actual = mySQLPositionInitializer.init(dataSource);
assertThat(actual.getServerId(), is(SERVER_ID));
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
}
- @Test
- public void assertInitPositionByJson() {
- MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(new Gson().toJson(new BinlogPosition(LOG_FILE_NAME, LOG_POSITION)));
- BinlogPosition actual = mysqlPositionManager.getPosition();
- assertThat(actual.getFilename(), is(LOG_FILE_NAME));
- assertThat(actual.getPosition(), is(LOG_POSITION));
- }
-
- @Test
- public void assertUpdateCurrentPosition() {
- MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
- BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID, 0);
- mysqlPositionManager.setPosition(expected);
- assertThat(mysqlPositionManager.getPosition(), is(expected));
- }
-
private PreparedStatement mockPositionStatement() throws SQLException {
PreparedStatement result = mock(PreparedStatement.class);
ResultSet resultSet = mock(ResultSet.class);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index 6ddec40..e0b0297 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -22,14 +22,14 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataConsistencyChecker;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSourceChecker;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionInitializer;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLScalingSQLBuilder;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
@@ -49,8 +49,8 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
- return PostgreSQLPositionManager.class;
+ public Class<? extends PositionInitializer> getPositionInitializer() {
+ return PostgreSQLPositionInitializer.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
similarity index 74%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
index 0e92caa..11e84db 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql.component;
-import com.google.common.base.Preconditions;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PSQLException;
@@ -31,9 +29,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * PostgreSQL wal position manager.
+ * PostgreSQL wal position initializer.
*/
-public final class PostgreSQLPositionManager extends PositionManager {
+public final class PostgreSQLPositionInitializer implements PositionInitializer<WalPosition> {
public static final String SLOT_NAME = "sharding_scaling";
@@ -41,28 +39,11 @@ public final class PostgreSQLPositionManager extends PositionManager {
public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
- public PostgreSQLPositionManager(final DataSource dataSource) {
- super(dataSource);
- initPosition();
- }
-
- public PostgreSQLPositionManager(final String position) {
- super(new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(position))));
- }
-
@Override
- public WalPosition getPosition() {
- Position<?> position = super.getPosition();
- Preconditions.checkState(null != position, "Unknown position.");
- return (WalPosition) position;
- }
-
- private void initPosition() {
- try (Connection connection = getDataSource().getConnection()) {
+ public WalPosition init(final DataSource dataSource) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
createIfNotExists(connection);
- setPosition(getWalPosition(connection));
- } catch (final SQLException ex) {
- throw new RuntimeException("init position failed.", ex);
+ return getWalPosition(connection);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 635c7d5..4e713af 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -75,7 +75,7 @@ public final class PostgreSQLWalDumper extends AbstractScalingExecutor implement
try {
Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
- PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
+ PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());
while (isRunning()) {
ByteBuffer message = stream.readPending();
if (null == message) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
index f9e95ad..d37c32e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntryTest.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataCons
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSourceChecker;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
-import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionInitializer;
import org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
import org.junit.Test;
@@ -37,7 +37,7 @@ public final class PostgreSQLScalingEntryTest {
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType("PostgreSQL");
assertTrue(scalingEntry instanceof PostgreSQLScalingEntry);
- assertThat(scalingEntry.getPositionManager(), equalTo(PostgreSQLPositionManager.class));
+ assertThat(scalingEntry.getPositionInitializer(), equalTo(PostgreSQLPositionInitializer.class));
assertThat(scalingEntry.getDataSourceCheckerClass(), equalTo(PostgreSQLDataSourceChecker.class));
assertThat(scalingEntry.getDataConsistencyCheckerClass(), equalTo(PostgreSQLDataConsistencyChecker.class));
assertThat(scalingEntry.getImporterClass(), equalTo(PostgreSQLImporter.class));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
similarity index 79%
rename from shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
index d3b00c2..8b0e9a7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializerTest.java
@@ -39,7 +39,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLPositionManagerTest {
+public final class PostgreSQLPositionInitializerTest {
private static final String POSTGRESQL_96_LSN = "0/14EFDB8";
@@ -66,23 +66,17 @@ public final class PostgreSQLPositionManagerTest {
}
@Test
- public void assertInitPositionByJson() {
- WalPosition actual = new PostgreSQLPositionManager("100").getPosition();
- assertThat(actual.getLogSequenceNumber().asLong(), is(LogSequenceNumber.valueOf(100L).asLong()));
- }
-
- @Test
public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- WalPosition actual = new PostgreSQLPositionManager(dataSource).getPosition();
+ WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}
@Test
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
- WalPosition actual = new PostgreSQLPositionManager(dataSource).getPosition();
+ WalPosition actual = new PostgreSQLPositionInitializer().init(dataSource);
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}
@@ -90,18 +84,7 @@ public final class PostgreSQLPositionManagerTest {
public void assertGetCurrentPositionThrowException() throws SQLException {
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
- new PostgreSQLPositionManager(dataSource).getPosition();
- }
-
- @Test
- @SneakyThrows(SQLException.class)
- public void assertUpdateCurrentPosition() {
- when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
- when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- PostgreSQLPositionManager positionManager = new PostgreSQLPositionManager(dataSource);
- WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
- positionManager.setPosition(expected);
- assertThat(positionManager.getPosition(), is(expected));
+ new PostgreSQLPositionInitializer().init(dataSource);
}
@SneakyThrows(SQLException.class)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
index db1fb2b..107f76a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.scaling.postgresql.component;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -86,7 +86,7 @@ public final class PostgreSQLWalDumperTest {
when(logicalReplication.createPgConnection(jdbcDataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
when(pgConnection.getTimestampUtils()).thenReturn(null);
- when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionManager.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
+ when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.test: DELETE: data[integer]:1".getBytes());
when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new SQLException(""));
when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
index 79b9421..34a6824 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java
@@ -22,6 +22,7 @@ import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.scaling.core.api.JobSchedulerCenter;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -55,12 +56,14 @@ public final class ScalingElasticJob implements SimpleJob {
scalingConfig.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
scalingConfig.getJobConfiguration().setJobId(Long.valueOf(shardingContext.getJobName()));
scalingJob = SCALING_JOB_SERVICE.start(scalingConfig).orElse(null);
+ JobSchedulerCenter.addJob(scalingJob);
}
private void stopJob(final ShardingContext shardingContext) {
if (null != scalingJob) {
log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
SCALING_JOB_SERVICE.stop(scalingJob.getJobId());
+ JobSchedulerCenter.removeJob(scalingJob);
scalingJob = null;
}
}