You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by ki...@apache.org on 2020/07/27 10:01:31 UTC
[shardingsphere] branch master updated: Scaling job Breakpoint
resume ability. (#6460)
This is an automated email from the ASF dual-hosted git repository.
kimmking pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 90724a6 Scaling job Breakpoint resume ability. (#6460)
90724a6 is described below
commit 90724a69b598b20f9d18f9b733e1d17ef7326807
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Mon Jul 27 18:01:13 2020 +0800
Scaling job Breakpoint resume ability. (#6460)
* Scaling job Breakpoint resume ability.
* update shardingsphere-orchestration-repository-zookeeper-curator dependency
Co-authored-by: qiulu3 <Lucas209910>
---
.../user-manual/shardingsphere-scaling/usage.cn.md | 2 +-
.../user-manual/shardingsphere-scaling/usage.en.md | 2 +-
.../scaling/web/HttpServerHandler.java | 2 +-
.../scaling/fixture/FixtureH2ScalingEntry.java | 6 +-
...reNopLogManager.java => FixtureNopManager.java} | 12 +-
.../shardingsphere-scaling-core/pom.xml | 5 +
.../scaling/core/config/JobConfiguration.java | 4 +-
.../scaling/core/config/RdbmsConfiguration.java | 17 +--
.../ResumeConfiguration.java} | 15 ++-
.../scaling/core/config/ServerConfiguration.java | 5 +-
.../core/config/utils/RdbmsConfigurationUtil.java | 49 +++++++
.../core/config/utils/SyncConfigurationUtil.java | 11 +-
.../executor/AbstractShardingScalingExecutor.java | 9 ++
.../executor/channel/DistributionChannel.java | 12 +-
.../executor/dumper/AbstractJDBCDumper.java | 18 ++-
.../execute/executor/dumper/DumperFactory.java | 12 +-
.../execute/executor/importer/ImporterFactory.java | 8 +-
.../core/execute/executor/record/DataRecord.java | 6 +-
.../execute/executor/record/FinishedRecord.java | 6 +-
.../execute/executor/record/PlaceholderRecord.java | 6 +-
.../core/execute/executor/record/Record.java | 4 +-
.../scaling/core/job/ShardingScalingJob.java | 2 +
.../{NopLogPosition.java => NopPosition.java} | 14 +-
.../position/{LogPosition.java => Position.java} | 12 +-
...ogPositionManager.java => PositionManager.java} | 11 +-
...gerFactory.java => PositionManagerFactory.java} | 25 +++-
.../core/job/position/PrimaryKeyPosition.java | 87 ++++++++++++
...osition.java => PrimaryKeyPositionManager.java} | 18 ++-
.../resume/AbstractResumablePositionManager.java | 150 +++++++++++++++++++++
.../resume/FakeResumablePositionManager.java} | 24 +---
.../position/resume/ResumablePositionManager.java | 68 ++++++++++
.../ResumablePositionManagerFactory.java} | 29 ++--
.../resume/ZookeeperResumablePositionManager.java | 104 ++++++++++++++
.../job/preparer/ShardingScalingJobPreparer.java | 64 +++++----
.../job/preparer/resumer/SyncPositionResumer.java | 140 +++++++++++++++++++
.../splitter/InventoryDataTaskSplitter.java | 18 ++-
.../core/job/preparer/utils/JobPrepareUtil.java | 52 +++++++
.../core/job/task/DefaultSyncTaskFactory.java | 9 +-
.../scaling/core/job/task/ScalingTask.java | 15 +++
.../scaling/core/job/task/SyncTaskFactory.java | 10 +-
.../incremental/IncrementalDataScalingTask.java | 30 ++---
.../IncrementalDataSyncTaskProgress.java | 4 +-
.../task/inventory/InventoryDataScalingTask.java | 42 +++---
.../inventory/InventoryDataScalingTaskGroup.java | 7 +-
.../core/schedule/ScalingTaskScheduler.java | 4 +
.../scaling/core/spi/ScalingEntry.java | 8 +-
.../core/config/RdbmsConfigurationTest.java | 10 +-
.../importer/AbstractJDBCImporterTest.java | 11 +-
.../executor/importer/AbstractSqlBuilderTest.java | 4 +-
.../core/fixture/FixtureH2ScalingEntry.java | 4 +-
.../preparer/resumer/SyncPositionResumerTest.java | 114 ++++++++++++++++
.../InventoryDataScalingTaskGroupTest.java | 2 +-
.../inventory/InventoryDataScalingTaskTest.java | 3 +
.../scaling/mysql/BinlogPosition.java | 23 +++-
.../scaling/mysql/MySQLBinlogDumper.java | 8 +-
.../scaling/mysql/MySQLImporter.java | 2 +-
...itionManager.java => MySQLPositionManager.java} | 25 ++--
.../scaling/mysql/MySQLScalingEntry.java | 6 +-
.../AbstractResumablePositionManagerTest.java | 76 +++++++++++
...agerTest.java => MySQLPositionManagerTest.java} | 12 +-
.../scaling/postgresql/PostgreSQLImporter.java | 2 +-
...Manager.java => PostgreSQLPositionManager.java} | 22 +--
.../scaling/postgresql/PostgreSQLScalingEntry.java | 6 +-
.../scaling/postgresql/PostgreSQLWalDumper.java | 8 +-
.../scaling/postgresql/WalPosition.java | 15 ++-
...est.java => PostgreSQLPositionManagerTest.java} | 20 +--
66 files changed, 1251 insertions(+), 280 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 54d5810..1407028 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -149,7 +149,7 @@ curl -X GET \
"realTimeSyncTaskProgress": {
"id": "realtime-test",
"delayMillisecond": 1576563771372,
- "logPosition": {
+ "position": {
"filename": "ON.000007",
"position": 177532875,
"serverId": 0
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 221943b..9e420af 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -149,7 +149,7 @@ Response:
"realTimeSyncTaskProgress": {
"id": "realtime-test",
"delayMillisecond": 1576563771372,
- "logPosition": {
+ "position": {
"filename": "ON.000007",
"position": 177532875,
"serverId": 0
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 3ee8f6e..10902ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -88,7 +88,7 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
private void startJob(final ChannelHandlerContext channelHandlerContext, final String requestBody) {
ScalingConfiguration scalingConfiguration = GSON.fromJson(requestBody, ScalingConfiguration.class);
- ShardingScalingJob shardingScalingJob = new ShardingScalingJob("Local Sharding Scaling Job");
+ ShardingScalingJob shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem());
shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration));
log.info("start job : {}", requestBody);
SCALING_JOB_CONTROLLER.start(shardingScalingJob);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index e45fb7c..a322495 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.fixture;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -37,8 +37,8 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends LogPositionManager> getLogPositionManager() {
- return FixtureNopLogManager.class;
+ public Class<? extends PositionManager> getPositionManager() {
+ return FixtureNopManager.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
similarity index 72%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 81bf56e..0a30ccc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopLogManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -18,22 +18,22 @@
package org.apache.shardingsphere.scaling.fixture;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import javax.sql.DataSource;
@RequiredArgsConstructor
-public final class FixtureNopLogManager implements LogPositionManager<NopLogPosition> {
+public final class FixtureNopManager implements PositionManager<NopPosition> {
private final DataSource dataSource;
@Override
- public NopLogPosition getCurrentPosition() {
- return new NopLogPosition();
+ public NopPosition getCurrentPosition() {
+ return new NopPosition();
}
@Override
- public void updateCurrentPosition(final NopLogPosition newLogPosition) {
+ public void updateCurrentPosition(final NopPosition newPosition) {
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index f6cb56e..5e234ce 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-orchestration-repository-zookeeper-curator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-infra-executor</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index de8046c..ca34fc5 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.config;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@@ -25,7 +24,6 @@ import lombok.Setter;
/**
* Job configuration.
*/
-@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@@ -35,6 +33,8 @@ public final class JobConfiguration {
private int retryTimes = 3;
+ private String jobName;
+
private String[] shardingTables;
private int shardingItem;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index e8798d8..4468982 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -21,6 +21,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import java.util.Map;
import java.util.Set;
@@ -33,15 +34,19 @@ import java.util.Set;
@EqualsAndHashCode
public final class RdbmsConfiguration implements Cloneable {
+ private String dataSourceName;
+
private DataSourceConfiguration dataSourceConfiguration;
private String tableName;
private Map<String, Set<String>> shardingColumnsMap;
- private String whereCondition;
+ private String primaryKey;
+
+ private PositionManager positionManager;
- private int spiltNum;
+ private Integer spiltNum;
private Map<String, String> tableNameMap;
@@ -58,12 +63,4 @@ public final class RdbmsConfiguration implements Cloneable {
return (RdbmsConfiguration) origin.clone();
}
- /**
- * Get where condition.
- *
- * @return "" if whereCondition is null, otherwise whereCondition
- */
- public String getWhereCondition() {
- return null == whereCondition ? "" : whereCondition;
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
similarity index 76%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
index fb17e65..a543769 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ResumeConfiguration.java
@@ -15,12 +15,19 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.core.config;
-import java.io.Serializable;
+import lombok.Getter;
+import lombok.Setter;
/**
- * Log position interface.
+ * Resume configuration.
*/
-public interface LogPosition<T> extends Comparable<T>, Serializable {
+@Getter
+@Setter
+public final class ResumeConfiguration {
+
+ private String serverLists;
+
+ private String namespace;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
index 992640f..d36099c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ServerConfiguration.java
@@ -17,10 +17,9 @@
package org.apache.shardingsphere.scaling.core.config;
-import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
-
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.infra.yaml.config.YamlConfiguration;
/**
* Global server configuration.
@@ -36,4 +35,6 @@ public final class ServerConfiguration implements YamlConfiguration {
private int pushTimeout = 1000;
private int workerThread = 30;
+
+ private ResumeConfiguration resumeConfiguration;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
new file mode 100644
index 0000000..60ddf20
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.config.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+/**
+ * Rdbms configuration Util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RdbmsConfigurationUtil {
+
+ /**
+ * Get sql where condition whit primary key.
+ *
+ * @param rdbmsConfiguration rdbms configuration
+ * @return sql where condition
+ */
+ public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) {
+ return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager());
+ }
+
+ private static String getWhereCondition(final String primaryKey, final PositionManager<PrimaryKeyPosition> positionManager) {
+ if (null == primaryKey || null == positionManager) {
+ return "";
+ }
+ PrimaryKeyPosition position = positionManager.getCurrentPosition();
+ return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
index f5975d0..d2c26b4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/SyncConfigurationUtil.java
@@ -62,12 +62,11 @@ public final class SyncConfigurationUtil {
ShardingRuleConfiguration sourceRule = ConfigurationYamlConverter.loadShardingRuleConfiguration(scalingConfiguration.getRuleConfiguration().getSourceRule());
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfiguration.getJobConfiguration());
- for (String each : dataSourceTableNameMap.keySet()) {
- RdbmsConfiguration dumperConfiguration = createDumperConfiguration(sourceDatasource.get(each));
+ for (Map.Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
+ RdbmsConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey()));
dumperConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
- Map<String, String> tableNameMap = dataSourceTableNameMap.get(each);
- result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), tableNameMap, dumperConfiguration, importerConfiguration));
+ result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), entry.getValue(), dumperConfiguration, importerConfiguration));
}
return result;
}
@@ -147,8 +146,9 @@ public final class SyncConfigurationUtil {
}
}
- private static RdbmsConfiguration createDumperConfiguration(final DataSourceConfiguration dataSourceConfiguration) {
+ private static RdbmsConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration) {
RdbmsConfiguration result = new RdbmsConfiguration();
+ result.setDataSourceName(dataSourceName);
Map<String, Object> dataSourceProperties = dataSourceConfiguration.getProps();
JDBCDataSourceConfiguration dumperDataSourceConfiguration = new JDBCDataSourceConfiguration(
dataSourceProperties.containsKey("jdbcUrl") ? dataSourceProperties.get("jdbcUrl").toString() : dataSourceProperties.get("url").toString(),
@@ -189,5 +189,4 @@ public final class SyncConfigurationUtil {
}
return Collections.EMPTY_SET;
}
-
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
index dcefcb5..5fddd2c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
/**
* Abstract sharding scaling executor.
@@ -29,6 +30,14 @@ public abstract class AbstractShardingScalingExecutor implements ShardingScaling
@Setter(AccessLevel.PROTECTED)
@Getter(AccessLevel.PROTECTED)
private boolean running;
+
+ @Setter
+ @Getter
+ private String taskId;
+
+ @Setter
+ @Getter
+ private PositionManager positionManager;
/**
* Generic start implement.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
index a661acf..f3ab05e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.scaling.core.execute.executor.channel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import java.util.HashMap;
import java.util.LinkedList;
@@ -55,7 +55,7 @@ public final class DistributionChannel implements Channel {
private final Queue<Record> toBeAcknowledgeRecords = new ConcurrentLinkedQueue<>();
- private final Map<LogPosition, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
+ private final Map<Position, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
private ScheduledExecutorService scheduleAckRecordsExecutor;
@@ -78,10 +78,10 @@ public final class DistributionChannel implements Channel {
List<Record> result = new LinkedList<>();
while (!toBeAcknowledgeRecords.isEmpty()) {
Record record = toBeAcknowledgeRecords.peek();
- if (pendingAcknowledgeRecords.containsKey(record.getLogPosition())) {
+ if (pendingAcknowledgeRecords.containsKey(record.getPosition())) {
result.add(record);
toBeAcknowledgeRecords.poll();
- pendingAcknowledgeRecords.remove(record.getLogPosition());
+ pendingAcknowledgeRecords.remove(record.getPosition());
} else {
break;
}
@@ -107,7 +107,7 @@ public final class DistributionChannel implements Channel {
channels.get(index).pushRecord(dataRecord);
} else if (PlaceholderRecord.class.equals(record.getClass())) {
toBeAcknowledgeRecords.add(record);
- pendingAcknowledgeRecords.put(record.getLogPosition(), record);
+ pendingAcknowledgeRecords.put(record.getPosition(), record);
} else {
throw new RuntimeException("Not Support Record Type");
}
@@ -161,7 +161,7 @@ public final class DistributionChannel implements Channel {
@Override
public void onAck(final List<Record> records) {
for (Record record : records) {
- pendingAcknowledgeRecords.put(record.getLogPosition(), record);
+ pendingAcknowledgeRecords.put(record.getPosition(), record);
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index b68039d..8cc8b55 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -23,15 +23,17 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
@@ -80,12 +82,12 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
@Override
public final void dump(final Channel channel) {
try (Connection conn = dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()).getConnection()) {
- String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), rdbmsConfiguration.getWhereCondition());
+ String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration));
PreparedStatement ps = createPreparedStatement(conn, sql);
ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (isRunning() && rs.next()) {
- DataRecord record = new DataRecord(new NopLogPosition(), metaData.getColumnCount());
+ DataRecord record = new DataRecord(newPrimaryKeyPosition(rs), metaData.getColumnCount());
record.setType("BOOTSTRAP-INSERT");
record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -93,15 +95,23 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
}
pushRecord(record);
}
+ pushRecord(new FinishedRecord(new PrimaryKeyPosition.FinishedPosition()));
} catch (SQLException e) {
stop();
channel.close();
throw new SyncTaskExecuteException(e);
} finally {
- pushRecord(new FinishedRecord(new NopLogPosition()));
+ pushRecord(new FinishedRecord(new NopPosition()));
}
}
+ private PrimaryKeyPosition newPrimaryKeyPosition(final ResultSet rs) throws SQLException {
+ if (null == rdbmsConfiguration.getPrimaryKey()) {
+ return new PrimaryKeyPosition.PlaceholderPosition();
+ }
+ return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getCurrentPosition()).getEndValue());
+ }
+
/**
* Create prepared statement.
*
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
index 125fa99..20ae77b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
@@ -21,7 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -62,11 +62,11 @@ public final class DumperFactory {
* New instance of log dumper.
*
* @param rdbmsConfiguration rdbms configuration
- * @param position log position
+ * @param position position
* @return log dumper
*/
@SneakyThrows
- public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition position) {
+ public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
return newInstanceLogDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, position);
}
@@ -75,12 +75,12 @@ public final class DumperFactory {
*
* @param databaseType database type
* @param rdbmsConfiguration rdbms configuration
- * @param position log position
+ * @param position position
* @return log dumper
*/
@SneakyThrows
- public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final LogPosition position) {
+ public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final Position position) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, LogPosition.class).newInstance(rdbmsConfiguration, position);
+ return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, Position.class).newInstance(rdbmsConfiguration, position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
index 8167a5b..c7a8e54 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
@@ -19,9 +19,9 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
/**
* Importer factory.
@@ -32,7 +32,7 @@ public final class ImporterFactory {
* New instance of importer.
*
* @param rdbmsConfiguration rdbms configuration
- * @param dataSourceManager data source factory
+ * @param dataSourceManager data source factory
* @return importer
*/
public static Importer newInstance(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
@@ -42,9 +42,9 @@ public final class ImporterFactory {
/**
* New instance of importer.
*
- * @param databaseType database type
+ * @param databaseType database type
* @param rdbmsConfiguration rdbms configuration
- * @param dataSourceManager data source factory
+ * @param dataSourceManager data source factory
* @return importer
*/
@SneakyThrows
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index e3097e3..bed71b2 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import lombok.EqualsAndHashCode;
import lombok.Getter;
@@ -43,8 +43,8 @@ public final class DataRecord extends Record {
private String tableName;
- public DataRecord(final LogPosition logPosition, final int columnCount) {
- super(logPosition);
+ public DataRecord(final Position position, final int columnCount) {
+ super(position);
columns = new ArrayList<>(columnCount);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
index 2f10769..9d92680 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/FinishedRecord.java
@@ -17,14 +17,14 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
/**
* Finished record.
*/
public final class FinishedRecord extends Record {
- public FinishedRecord(final LogPosition logPosition) {
- super(logPosition);
+ public FinishedRecord(final Position position) {
+ super(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
index 30f642f..86d4e26 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/PlaceholderRecord.java
@@ -17,14 +17,14 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
/**
* Placeholder record.
*/
public final class PlaceholderRecord extends Record {
- public PlaceholderRecord(final LogPosition logPosition) {
- super(logPosition);
+ public PlaceholderRecord(final Position position) {
+ super(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
index 62a21cd..1bd6723 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/Record.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -30,7 +30,7 @@ import lombok.Setter;
@Setter
public abstract class Record {
- private final LogPosition logPosition;
+ private final Position position;
private long commitTime;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
index b1fc2db..5fb6dc6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
@@ -48,5 +48,7 @@ public final class ShardingScalingJob {
private final String jobName;
+ private final int shardingItem;
+
private String status = "RUNNING";
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
similarity index 76%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
index 7675f26..850c276 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
@@ -17,15 +17,23 @@
package org.apache.shardingsphere.scaling.core.job.position;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
/**
- * Nop log position.
+ * Nop position.
*/
-public final class NopLogPosition implements LogPosition<NopLogPosition> {
+public final class NopPosition implements Position<NopPosition> {
private static final long serialVersionUID = 1946907178847169020L;
@Override
- public int compareTo(final NopLogPosition nopLogPosition) {
+ public int compareTo(final NopPosition nopPosition) {
return 0;
}
+
+ @Override
+ public JsonElement toJson() {
+ return new JsonObject();
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
similarity index 80%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
index fb17e65..b34e467 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
@@ -17,10 +17,18 @@
package org.apache.shardingsphere.scaling.core.job.position;
+import com.google.gson.JsonElement;
+
import java.io.Serializable;
/**
- * Log position interface.
+ * Position interface.
*/
-public interface LogPosition<T> extends Comparable<T>, Serializable {
+public interface Position<T> extends Comparable<T>, Serializable {
+
+ /**
+ * To json element.
+ * @return json element
+ */
+ JsonElement toJson();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
similarity index 82%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
index 085f03f..fac5976 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
@@ -20,20 +20,21 @@ package org.apache.shardingsphere.scaling.core.job.position;
/**
* Database itself data synchronize position manager.
* Such as mysql binlog, postgreSQL wal.
+ * Or use primary key as position.
*/
-public interface LogPositionManager<T extends LogPosition> {
+public interface PositionManager<T extends Position> {
/**
- * Get current log position.
+ * Get current position.
*
- * @return log position
+ * @return position
*/
T getCurrentPosition();
/**
* Update currentPosition.
*
- * @param newLogPosition new log position.
+ * @param newPosition new position.
*/
- void updateCurrentPosition(T newLogPosition);
+ void updateCurrentPosition(T newPosition);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
similarity index 61%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
index 6359b0f..fe0ed84 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
@@ -26,21 +26,34 @@ import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import javax.sql.DataSource;
/**
- * Log manager factory.
+ * Position manager factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LogPositionManagerFactory {
+public final class PositionManagerFactory {
/**
- * New instance of log manager.
+ * New instance of position manager.
*
* @param databaseType database type
* @param dataSource data source
- * @return log manager
+ * @return position manager
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static LogPositionManager newInstanceLogManager(final String databaseType, final DataSource dataSource) {
+ public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getLogPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+ return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+ }
+
+ /**
+ * New instance of position manager.
+ *
+ * @param databaseType database type
+ * @param position position
+ * @return position manager
+ */
+ @SneakyThrows(ReflectiveOperationException.class)
+ public static PositionManager newInstance(final String databaseType, final String position) {
+ ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+ return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
new file mode 100644
index 0000000..f624855
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * Use primary key as position.
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class PrimaryKeyPosition implements Position {
+
+ private static final Gson GSON = new Gson();
+
+ private long beginValue;
+
+ private long endValue;
+
+ @Override
+ public int compareTo(final Object position) {
+ if (null == position) {
+ return 1;
+ }
+ return Long.compare(beginValue, ((PrimaryKeyPosition) position).getBeginValue());
+ }
+
+ /**
+ * Transform primary key position from json to object.
+ *
+ * @param json json data
+ * @return primary key position
+ */
+ public static PrimaryKeyPosition fromJson(final String json) {
+ List<Double> values = GSON.fromJson(json, List.class);
+ if (values.size() == 2) {
+ return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
+ }
+ return new PlaceholderPosition();
+ }
+
+ @Override
+ public JsonElement toJson() {
+ return GSON.toJsonTree(new long[]{beginValue, endValue});
+ }
+
+ /**
+ * Finish flag position for inventory task finished.
+ */
+ public static class FinishedPosition extends PrimaryKeyPosition {
+
+ }
+
+ /**
+ * Placeholder position for without primary key table.
+ */
+ public static class PlaceholderPosition extends PrimaryKeyPosition {
+
+ public PlaceholderPosition() {
+ super(-1, -1);
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
similarity index 67%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
index 7675f26..b101aaf 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopLogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
@@ -17,15 +17,23 @@
package org.apache.shardingsphere.scaling.core.job.position;
+import lombok.AllArgsConstructor;
+
/**
- * Nop log position.
+ * Primary key position manager.
*/
-public final class NopLogPosition implements LogPosition<NopLogPosition> {
+@AllArgsConstructor
+public final class PrimaryKeyPositionManager implements PositionManager<PrimaryKeyPosition> {
+
+ private PrimaryKeyPosition position;
- private static final long serialVersionUID = 1946907178847169020L;
+ @Override
+ public PrimaryKeyPosition getCurrentPosition() {
+ return position;
+ }
@Override
- public int compareTo(final NopLogPosition nopLogPosition) {
- return 0;
+ public void updateCurrentPosition(final PrimaryKeyPosition newPosition) {
+ position = newPosition;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
new file mode 100644
index 0000000..d524c13
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract resumable position manager.
+ */
+@Getter
+@Setter
+@Slf4j
+public abstract class AbstractResumablePositionManager implements ResumablePositionManager, Closeable {
+
+ private static final Gson GSON = new Gson();
+
+ private String databaseType;
+
+ private String taskPath;
+
+ private boolean available;
+
+ private boolean resumable;
+
+ private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
+
+ private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
+
+ @Override
+ public void persistInventoryPosition() {
+ }
+
+ @Override
+ public void persistIncrementalPosition() {
+ }
+
+ protected void resumeInventoryPosition(final String data) {
+ if (isEmpty(data)) {
+ return;
+ }
+ log.info("resume inventory position from {} = {}", taskPath, data);
+ InventoryPosition inventoryPosition = InventoryPosition.fromJson(data);
+ Map<String, PrimaryKeyPosition> unfinish = inventoryPosition.getUnfinish();
+ for (Map.Entry<String, PrimaryKeyPosition> entry : unfinish.entrySet()) {
+ getInventoryPositionManagerMap().put(entry.getKey(), new PrimaryKeyPositionManager(entry.getValue()));
+ }
+ for (String each : inventoryPosition.getFinished()) {
+ getInventoryPositionManagerMap().put(each, new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+ }
+ }
+
+ protected void resumeIncrementalPosition(final String data) {
+ if (isEmpty(data)) {
+ return;
+ }
+ log.info("resume incremental position from {} = {}", taskPath, data);
+ Map<String, Object> incrementalPosition = GSON.fromJson(data, Map.class);
+ for (Map.Entry<String, Object> entry : incrementalPosition.entrySet()) {
+ getIncrementalPositionManagerMap().put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
+ }
+ }
+
+ private boolean isEmpty(final String data) {
+ return null == data || "".equals(data);
+ }
+
+ protected String getInventoryPositionData() {
+ JsonObject result = new JsonObject();
+ JsonObject unfinish = new JsonObject();
+ Set<String> finished = Sets.newHashSet();
+ for (Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionManagerMap().entrySet()) {
+ if (entry.getValue().getCurrentPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+ finished.add(entry.getKey());
+ continue;
+ }
+ unfinish.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+ }
+ result.add("unfinish", unfinish);
+ result.add("finished", GSON.toJsonTree(finished));
+ return result.toString();
+ }
+
+ protected String getIncrementalPositionData() {
+ JsonObject result = new JsonObject();
+ for (Map.Entry<String, PositionManager> entry : getIncrementalPositionManagerMap().entrySet()) {
+ result.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+ }
+ return result.toString();
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Getter
+ @Setter
+ private static final class InventoryPosition {
+
+ private Map<String, PrimaryKeyPosition> unfinish;
+
+ private Set<String> finished;
+
+ /**
+ * Transform inventory position from json to object.
+ *
+ * @param data json data
+ * @return inventory position
+ */
+ public static InventoryPosition fromJson(final String data) {
+ InventoryPosition result = new InventoryPosition();
+ JsonObject json = JsonParser.parseString(data).getAsJsonObject();
+ Map<String, Object> unfinish = GSON.fromJson(json.getAsJsonObject("unfinish"), Map.class);
+ result.setUnfinish(unfinish.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> PrimaryKeyPosition.fromJson(entry.getValue().toString()))));
+ result.setFinished(GSON.fromJson(json.getAsJsonArray("finished"), Set.class));
+ return result;
+ }
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
similarity index 68%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
index de8046c..4a09625 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/FakeResumablePositionManager.java
@@ -15,28 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.job.position.resume;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
import lombok.NoArgsConstructor;
-import lombok.Setter;
/**
- * Job configuration.
+ * Fake resumable position manager as defalut.
*/
-@AllArgsConstructor
@NoArgsConstructor
-@Setter
-@Getter
-public final class JobConfiguration {
-
- private int concurrency = 3;
-
- private int retryTimes = 3;
-
- private String[] shardingTables;
-
- private int shardingItem;
+public final class FakeResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
+ public FakeResumablePositionManager(final String databaseType, final String taskPath) {
+ setDatabaseType(databaseType);
+ setTaskPath(taskPath);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java
new file mode 100644
index 0000000..df3afdc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+import java.util.Map;
+
+/**
+ * Resumable position manager interface.
+ */
+public interface ResumablePositionManager {
+
+ /**
+ * If it is available.
+ * @return is available
+ */
+ boolean isAvailable();
+
+ /**
+ * If has resumable data.
+ * @return is resumable
+ */
+ boolean isResumable();
+
+ /**
+ * Get inventory position map.
+ * @return inventory position map
+ */
+ Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionManagerMap();
+
+ /**
+ * Get incremental position map.
+ * @return incremental position map
+ */
+ Map<String, PositionManager> getIncrementalPositionManagerMap();
+
+ /**
+ * Persist inventory position.
+ */
+ void persistInventoryPosition();
+
+ /**
+ * Persist incremental position.
+ */
+ void persistIncrementalPosition();
+
+ /**
+ * Close this manager.
+ */
+ void close();
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
similarity index 57%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
index 6359b0f..ff6f136 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/LogPositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumablePositionManagerFactory.java
@@ -15,32 +15,35 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.position;
+package org.apache.shardingsphere.scaling.core.job.position.resume;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-
-import javax.sql.DataSource;
/**
- * Log manager factory.
+ * Resumable position manager factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class LogPositionManagerFactory {
+public final class ResumablePositionManagerFactory {
+
+ private static Class<? extends ResumablePositionManager> clazz = FakeResumablePositionManager.class;
+
+ static {
+ if (new ZookeeperResumablePositionManager().isAvailable()) {
+ clazz = ZookeeperResumablePositionManager.class;
+ }
+ }
/**
- * New instance of log manager.
+ * New resumable position manager instance.
*
* @param databaseType database type
- * @param dataSource data source
- * @return log manager
+ * @param taskPath task path for persist data.
+ * @return resumable position manager
*/
@SneakyThrows(ReflectiveOperationException.class)
- public static LogPositionManager newInstanceLogManager(final String databaseType, final DataSource dataSource) {
- ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getLogPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+ public static ResumablePositionManager newInstance(final String databaseType, final String taskPath) {
+ return clazz.getConstructor(String.class, String.class).newInstance(databaseType, taskPath);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
new file mode 100644
index 0000000..5f2f045
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ZookeeperResumablePositionManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.orchestration.repository.api.config.OrchestrationCenterConfiguration;
+import org.apache.shardingsphere.orchestration.repository.zookeeper.CuratorZookeeperRepository;
+import org.apache.shardingsphere.scaling.core.config.ResumeConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Depends on zookeeper to manager position.
+ */
+@Slf4j
+public final class ZookeeperResumablePositionManager extends AbstractResumablePositionManager implements ResumablePositionManager {
+
+ private static final String INVENTORY = "/inventory";
+
+ private static final String INCREMENTAL = "/incremental";
+
+ private static final CuratorZookeeperRepository ZOOKEEPER = new CuratorZookeeperRepository();
+
+ private ScheduledExecutorService executor;
+
+ private String inventoryPath;
+
+ private String incrementalPath;
+
+ public ZookeeperResumablePositionManager() {
+ ResumeConfiguration resumeConfiguration = ScalingContext.getInstance().getServerConfiguration().getResumeConfiguration();
+ if (null != resumeConfiguration) {
+ ZOOKEEPER.init(getCenterConfiguration(resumeConfiguration));
+ log.info("zookeeper resumable position manager is available.");
+ setAvailable(true);
+ }
+ }
+
+ public ZookeeperResumablePositionManager(final String databaseType, final String taskPath) {
+ setDatabaseType(databaseType);
+ setTaskPath(taskPath);
+ this.inventoryPath = taskPath + INVENTORY;
+ this.incrementalPath = taskPath + INCREMENTAL;
+ resumePosition();
+ setResumable(!getInventoryPositionManagerMap().isEmpty() && !getIncrementalPositionManagerMap().isEmpty());
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleWithFixedDelay(this::persistPosition, 1, 1, TimeUnit.MINUTES);
+ }
+
+ private OrchestrationCenterConfiguration getCenterConfiguration(final ResumeConfiguration resumeConfiguration) {
+ OrchestrationCenterConfiguration centerConfiguration =
+ new OrchestrationCenterConfiguration("zookeeper", resumeConfiguration.getServerLists(), resumeConfiguration.getNamespace(), new Properties());
+ return centerConfiguration;
+ }
+
+ @Override
+ public void close() {
+ executor.submit(this::persistPosition);
+ executor.shutdown();
+ }
+
+ private void resumePosition() {
+ resumeInventoryPosition(ZOOKEEPER.get(inventoryPath));
+ resumeIncrementalPosition(ZOOKEEPER.get(incrementalPath));
+ }
+
+ private void persistPosition() {
+ persistIncrementalPosition();
+ persistInventoryPosition();
+ }
+
+ @Override
+ public void persistInventoryPosition() {
+ String result = getInventoryPositionData();
+ ZOOKEEPER.persist(inventoryPath, result);
+ log.info("persist inventory position {} = {}", inventoryPath, result);
+ }
+
+ @Override
+ public void persistIncrementalPosition() {
+ String result = getIncrementalPositionData();
+ ZOOKEEPER.persist(incrementalPath, result);
+ log.info("persist incremental position {} = {}", incrementalPath, result);
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index fd0f55a..e82f2ad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -17,27 +17,30 @@
package org.apache.shardingsphere.scaling.core.job.preparer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.schedule.SyncTaskControlStatus;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceCheckerCheckerFactory;
+import org.apache.shardingsphere.scaling.core.job.preparer.resumer.SyncPositionResumer;
import org.apache.shardingsphere.scaling.core.job.preparer.splitter.InventoryDataTaskSplitter;
+import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.schedule.SyncTaskControlStatus;
import javax.sql.DataSource;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
/**
* Sharding scaling job preparer.
@@ -49,6 +52,8 @@ public final class ShardingScalingJobPreparer {
private final InventoryDataTaskSplitter inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
+ private final SyncPositionResumer syncPositionResumer = new SyncPositionResumer();
+
/**
* Do prepare work for sharding scaling job.
*
@@ -58,14 +63,24 @@ public final class ShardingScalingJobPreparer {
String databaseType = shardingScalingJob.getSyncConfigurations().get(0).getDumperConfiguration().getDataSourceConfiguration().getDatabaseType().getName();
try (DataSourceManager dataSourceManager = new DataSourceManager(shardingScalingJob.getSyncConfigurations())) {
checkDatasources(databaseType, dataSourceManager);
+ ResumablePositionManager resumablePositionManager = getResumablePositionManager(databaseType, shardingScalingJob);
+ if (resumablePositionManager.isResumable()) {
+ syncPositionResumer.resumePosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
+ return;
+ }
initIncrementalDataTasks(databaseType, shardingScalingJob, dataSourceManager);
- splitInventoryDataTasks(shardingScalingJob, dataSourceManager);
+ initInventoryDataTasks(shardingScalingJob, dataSourceManager);
+ syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
} catch (PrepareFailedException ex) {
log.warn("Preparing sharding scaling job {} : {} failed", shardingScalingJob.getJobId(), shardingScalingJob.getJobName(), ex);
shardingScalingJob.setStatus(SyncTaskControlStatus.PREPARING_FAILURE.name());
}
}
+ private ResumablePositionManager getResumablePositionManager(final String databaseType, final ShardingScalingJob shardingScalingJob) {
+ return ResumablePositionManagerFactory.newInstance(databaseType, String.format("/%s/item-%d", shardingScalingJob.getJobName(), shardingScalingJob.getShardingItem()));
+ }
+
private void checkDatasources(final String databaseType, final DataSourceManager dataSourceManager) {
DataSourceChecker dataSourceChecker = DataSourceCheckerCheckerFactory.newInstanceDataSourceChecker(databaseType);
dataSourceChecker.checkConnection(dataSourceManager.getCachedDataSources().values());
@@ -73,35 +88,28 @@ public final class ShardingScalingJobPreparer {
dataSourceChecker.checkVariable(dataSourceManager.getSourceDatasources().values());
}
- private void splitInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
+ private void initInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each, dataSourceManager));
}
- for (Collection<ScalingTask> each : groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+ for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
}
}
- private List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
- List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
- for (int i = 0; i < taskNumber; i++) {
- result.add(new LinkedList<>());
- }
- for (int i = 0; i < allInventoryDataTasks.size(); i++) {
- result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
- }
- return result;
- }
-
private void initIncrementalDataTasks(final String databaseType, final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
- LogPositionManager logPositionManager = instanceLogPositionManager(databaseType, dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
- shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each, logPositionManager.getCurrentPosition()));
+ DataSourceConfiguration dataSourceConfiguration = each.getDumperConfiguration().getDataSourceConfiguration();
+ each.getDumperConfiguration().setPositionManager(instancePositionManager(databaseType, dataSourceManager.getDataSource(dataSourceConfiguration)));
+ shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
}
}
- private LogPositionManager instanceLogPositionManager(final String databaseType, final DataSource dataSource) {
- return LogPositionManagerFactory.newInstanceLogManager(databaseType, dataSource);
+ @SuppressWarnings("rawtypes")
+ private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
+ PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
+ positionManager.getCurrentPosition();
+ return positionManager;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
new file mode 100644
index 0000000..c1704da
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
+
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
+import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
+import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Sync position resumer.
+ */
+public final class SyncPositionResumer {
+
+ private final SyncTaskFactory syncTaskFactory = new DefaultSyncTaskFactory();
+
+ /**
+ * Resume position from this position manager.
+ *
+ * @param shardingScalingJob sharding scaling job
+ * @param dataSourceManager dataSource manager
+ * @param resumablePositionManager which position manager resume from
+ */
+ public void resumePosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+ resumeInventoryPosition(shardingScalingJob, dataSourceManager, resumablePositionManager);
+ resumeIncrementalPosition(shardingScalingJob, resumablePositionManager);
+ }
+
+ private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+ List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumablePositionManager);
+ for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+ shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
+ }
+ }
+
+ private List<ScalingTask> getAllInventoryDataTasks(
+ final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumablePositionManager resumablePositionManager) {
+ List<ScalingTask> result = new LinkedList<>();
+ for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
+ MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
+ for (Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumablePositionManager).entrySet()) {
+ result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
+ }
+ }
+ return result;
+ }
+
+ private SyncConfiguration newSyncConfiguration(
+ final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Map.Entry<String, PositionManager<PrimaryKeyPosition>> entry) {
+ String[] splitTable = entry.getKey().split("#");
+ RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+ splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
+ splitDumperConfig.setPositionManager(entry.getValue());
+ if (splitTable.length == 2) {
+ splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
+ }
+ splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
+ return new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
+ splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
+ }
+
+ private Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionMap(
+ final RdbmsConfiguration dumperConfiguration, final ResumablePositionManager resumablePositionManager) {
+ Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
+ return resumablePositionManager.getInventoryPositionManagerMap().entrySet().stream()
+ .filter(entry -> pattern.matcher(entry.getKey()).find())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
+ for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
+ each.getDumperConfiguration().setPositionManager(resumablePositionManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
+ shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+ }
+ }
+
+ /**
+ * Persist position when init sync job.
+ *
+ * @param shardingScalingJob sync job
+ * @param resumablePositionManager which position manager resume from
+ */
+ public void persistPosition(final ShardingScalingJob shardingScalingJob, final ResumablePositionManager resumablePositionManager) {
+ persistIncrementalPosition(shardingScalingJob.getIncrementalDataTasks(), resumablePositionManager);
+ persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumablePositionManager);
+ }
+
+ private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumablePositionManager resumablePositionManager) {
+ for (ScalingTask each : inventoryDataTasks) {
+ if (each instanceof InventoryDataScalingTaskGroup) {
+ putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumablePositionManager);
+ }
+ }
+ resumablePositionManager.persistInventoryPosition();
+ }
+
+ private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumablePositionManager resumablePositionManager) {
+ for (ScalingTask each : scalingTasks) {
+ resumablePositionManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
+ }
+ }
+
+ private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumablePositionManager resumablePositionManager) {
+ for (ScalingTask each : incrementalDataTasks) {
+ resumablePositionManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
+ }
+ resumablePositionManager.persistIncrementalPosition();
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 883af8f..0012b01 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -22,11 +22,15 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -36,14 +40,14 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import javax.sql.DataSource;
-
/**
* Inventory data task splitter.
*/
@Slf4j
public final class InventoryDataTaskSplitter {
+ private final SyncTaskFactory syncTaskFactory = new DefaultSyncTaskFactory();
+
/**
* Split inventory data to multi-tasks.
*
@@ -54,7 +58,7 @@ public final class InventoryDataTaskSplitter {
public Collection<ScalingTask> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
Collection<ScalingTask> result = new LinkedList<>();
for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) {
- result.add(new InventoryDataScalingTask(each));
+ result.add(syncTaskFactory.createInventoryDataSyncTask(each));
}
return result;
}
@@ -78,6 +82,7 @@ public final class InventoryDataTaskSplitter {
for (String each : syncConfiguration.getTableNameMap().keySet()) {
RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
dumperConfig.setTableName(each);
+ dumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
}
@@ -116,6 +121,7 @@ public final class InventoryDataTaskSplitter {
Collection<SyncConfiguration> result = new LinkedList<>();
RdbmsConfiguration dumperConfiguration = syncConfiguration.getDumperConfiguration();
String primaryKey = metaDataManager.getTableMetaData(dumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0);
+ dumperConfiguration.setPrimaryKey(primaryKey);
try (Connection connection = dataSource.getConnection()) {
PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, dumperConfiguration.getTableName()));
ResultSet rs = ps.executeQuery();
@@ -126,10 +132,10 @@ public final class InventoryDataTaskSplitter {
for (int i = 0; i < concurrency && min <= max; i++) {
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
if (i < concurrency - 1) {
- splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, min + step));
+ splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, min + step)));
min = min + step + 1;
} else {
- splitDumperConfig.setWhereCondition(String.format("WHERE %s BETWEEN %d AND %d", primaryKey, min, max));
+ splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, max)));
}
splitDumperConfig.setSpiltNum(i);
result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
new file mode 100644
index 0000000..ca69ddc
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Scaling job prepare util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class JobPrepareUtil {
+
+ /**
+ * Group inventory data tasks by task number.
+ *
+ * @param taskNumber task number
+ * @param allInventoryDataTasks all inventory data tasks
+ * @return task group list
+ */
+ public static List<Collection<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+ List<Collection<ScalingTask>> result = new ArrayList<>(taskNumber);
+ for (int i = 0; i < taskNumber; i++) {
+ result.add(new LinkedList<>());
+ }
+ for (int i = 0; i < allInventoryDataTasks.size(); i++) {
+ result.get(i % taskNumber).add(allInventoryDataTasks.get(i));
+ }
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
index 531c446..f9817b2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
@@ -17,14 +17,13 @@
package org.apache.shardingsphere.scaling.core.job.task;
-import java.util.Collection;
-
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
+import java.util.Collection;
+
/**
* Default sync task factory.
*/
@@ -41,7 +40,7 @@ public final class DefaultSyncTaskFactory implements SyncTaskFactory {
}
@Override
- public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration, final LogPosition logPosition) {
- return new IncrementalDataScalingTask(syncConfiguration, logPosition);
+ public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration) {
+ return new IncrementalDataScalingTask(syncConfiguration);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 320a0dc..66ba771 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
/**
* Sync task interface.
@@ -31,4 +32,18 @@ public interface ScalingTask extends ShardingScalingExecutor {
* @return migrate progress
*/
SyncProgress getProgress();
+
+ /**
+ * Get position manager.
+ *
+ * @return position manager
+ */
+ PositionManager getPositionManager();
+
+ /**
+ * Get task id.
+ *
+ * @return task id
+ */
+ String getTaskId();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
index ceb9f20..9df782c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
@@ -17,14 +17,13 @@
package org.apache.shardingsphere.scaling.core.job.task;
-import java.util.Collection;
-
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
+import java.util.Collection;
+
/**
* Sync task factory.
*/
@@ -33,7 +32,7 @@ public interface SyncTaskFactory {
/**
* Create inventory data sync task group.
*
- * @param inventoryDataScalingTasks inventory data sync tasks
+ * @param inventoryDataScalingTasks inventory data sync tasks
* @return inventory data sync task group
*/
InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask> inventoryDataScalingTasks);
@@ -50,8 +49,7 @@ public interface SyncTaskFactory {
* Create incremental data sync task.
*
* @param syncConfiguration sync configuration
- * @param logPosition log position of incremental data start
* @return incremental data sync task
*/
- IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration, LogPosition logPosition);
+ IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 5edd960..c690e39 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -20,20 +20,18 @@ package org.apache.shardingsphere.scaling.core.job.task.incremental;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.DistributionChannel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import java.util.ArrayList;
import java.util.Collection;
@@ -51,34 +49,29 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private final DataSourceManager dataSourceManager;
- private final String syncTaskId;
-
- private LogPosition logPosition;
-
private Dumper dumper;
private long delayMillisecond;
- public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration, final LogPosition logPosition) {
+ public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) {
this.syncConfiguration = syncConfiguration;
this.dataSourceManager = new DataSourceManager();
- this.logPosition = logPosition;
- DataSourceMetaData dataSourceMetaData = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration().getDataSourceMetaData();
- syncTaskId = String.format("incremental-%s", null != dataSourceMetaData.getCatalog() ? dataSourceMetaData.getCatalog() : dataSourceMetaData.getSchema());
+ setTaskId(syncConfiguration.getDumperConfiguration().getDataSourceName());
+ setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
}
@Override
public void start() {
syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
- dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), logPosition);
+ dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getCurrentPosition());
Collection<Importer> importers = instanceImporters();
instanceChannel(importers);
Future future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
-
+
@Override
public void onSuccess() {
}
-
+
@Override
public void onFailure(final Throwable throwable) {
log.error("get an error when migrating the increment data", throwable);
@@ -101,7 +94,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private void instanceChannel(final Collection<Importer> importers) {
DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
- logPosition = lastHandledRecord.getLogPosition();
+ getPositionManager().updateCurrentPosition(lastHandledRecord.getPosition());
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
});
dumper.setChannel(channel);
@@ -115,7 +108,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
future.get();
} catch (InterruptedException ignored) {
} catch (ExecutionException e) {
- throw new SyncTaskExecuteException(String.format("Task %s execute failed ", syncTaskId), e.getCause());
+ throw new SyncTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), e.getCause());
}
}
@@ -129,6 +122,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
@Override
public SyncProgress getProgress() {
- return new IncrementalDataSyncTaskProgress(syncTaskId, delayMillisecond, logPosition);
+ return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getCurrentPosition());
}
+
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
index 1e5bd06..7ba67b1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataSyncTaskProgress.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.task.incremental;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -34,5 +34,5 @@ public final class IncrementalDataSyncTaskProgress implements SyncProgress {
private final long delayMillisecond;
- private final LogPosition logPosition;
+ private final Position position;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 872f55b..ff2880b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -21,25 +21,27 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
+import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.infra.database.metadata.DataSourceMetaData;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
@@ -48,14 +50,12 @@ import java.util.concurrent.atomic.AtomicLong;
* Table slice execute task.
*/
@Slf4j
-public final class InventoryDataScalingTask implements ScalingTask {
+public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
private final SyncConfiguration syncConfiguration;
private final DataSourceManager dataSourceManager;
- private final String syncTaskId;
-
private long estimatedRows;
private final AtomicLong syncedRows = new AtomicLong();
@@ -69,13 +69,13 @@ public final class InventoryDataScalingTask implements ScalingTask {
public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
this.syncConfiguration = syncConfiguration;
this.dataSourceManager = dataSourceManager;
- syncTaskId = generateSyncTaskId(syncConfiguration.getDumperConfiguration());
+ setTaskId(generateSyncTaskId(syncConfiguration.getDumperConfiguration()));
+ setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
}
private String generateSyncTaskId(final RdbmsConfiguration dumperConfiguration) {
- DataSourceMetaData dataSourceMetaData = dumperConfiguration.getDataSourceConfiguration().getDataSourceMetaData();
- String result = String.format("inventory-%s-%s", Optional.ofNullable(dataSourceMetaData.getCatalog()).orElse(dataSourceMetaData.getSchema()), dumperConfiguration.getTableName());
- return null == dumperConfiguration.getWhereCondition() ? result : result + "#" + dumperConfiguration.getSpiltNum();
+ String result = String.format("%s.%s", dumperConfiguration.getDataSourceName(), dumperConfiguration.getTableName());
+ return null == dumperConfiguration.getSpiltNum() ? result : result + "#" + dumperConfiguration.getSpiltNum();
}
@Override
@@ -89,7 +89,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
@Override
public void onSuccess() {
}
-
+
@Override
public void onFailure(final Throwable throwable) {
log.error("get an error when migrating the inventory data", throwable);
@@ -106,7 +106,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.prepareStatement(String.format("SELECT COUNT(*) FROM %s %s",
syncConfiguration.getDumperConfiguration().getTableName(),
- syncConfiguration.getDumperConfiguration().getWhereCondition()))
+ RdbmsConfigurationUtil.getWhereCondition(syncConfiguration.getDumperConfiguration())))
.executeQuery();
resultSet.next();
estimatedRows = resultSet.getInt(1);
@@ -124,8 +124,12 @@ public final class InventoryDataScalingTask implements ScalingTask {
MemoryChannel channel = new MemoryChannel(records -> {
int count = 0;
for (Record record : records) {
- if (DataRecord.class.equals(record.getClass())) {
+ if (record instanceof DataRecord) {
count++;
+ } else if (record instanceof FinishedRecord) {
+ if (record.getPosition() instanceof PrimaryKeyPosition) {
+ getPositionManager().updateCurrentPosition(record.getPosition());
+ }
}
}
syncedRows.addAndGet(count);
@@ -139,7 +143,7 @@ public final class InventoryDataScalingTask implements ScalingTask {
future.get();
} catch (InterruptedException ignored) {
} catch (ExecutionException e) {
- throw new SyncTaskExecuteException(String.format("Task %s execute failed ", syncTaskId), e.getCause());
+ throw new SyncTaskExecuteException(String.format("Task %s execute failed ", getTaskId()), e.getCause());
}
}
@@ -153,10 +157,6 @@ public final class InventoryDataScalingTask implements ScalingTask {
@Override
public SyncProgress getProgress() {
- return new InventoryDataSyncTaskProgress(syncTaskId, estimatedRows, syncedRows.get());
- }
-
- @Override
- public void run() {
+ return new InventoryDataSyncTaskProgress(getTaskId(), estimatedRows, syncedRows.get());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
index d85fc9f..80eefed 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
@@ -17,18 +17,19 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import lombok.extern.slf4j.Slf4j;
-
import java.util.Collection;
/**
* Inventory data sync task group.
*/
@Slf4j
+@Getter
public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor implements ScalingTask {
private final Collection<ScalingTask> scalingTasks;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index c18705d..30be580 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -62,6 +62,10 @@ public final class ScalingTaskScheduler implements Runnable {
public void run() {
shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA.name());
ExecuteCallback inventoryDataTaskCallback = createInventoryDataTaskCallback();
+ if (shardingScalingJob.getInventoryDataTasks().size() == 0) {
+ executeIncrementalDataSyncTask();
+ return;
+ }
for (ScalingTask each : shardingScalingJob.getInventoryDataTasks()) {
ScalingContext.getInstance().getTaskExecuteEngine().submit(each, inventoryDataTaskCallback);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 1ea3139..4e14486 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.spi;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
@@ -44,11 +44,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
Class<? extends LogDumper> getLogDumperClass();
/**
- * Get log position manager type.
+ * Get position manager type.
*
- * @return log manager type
+ * @return position manager type
*/
- Class<? extends LogPositionManager> getLogPositionManager();
+ Class<? extends PositionManager> getPositionManager();
/**
* Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
index a5cd6e0..66c0774 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
@@ -17,6 +17,9 @@
package org.apache.shardingsphere.scaling.core.config;
+import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -38,8 +41,9 @@ public final class RdbmsConfigurationTest {
@Test
public void assertGetWhereCondition() {
RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration();
- assertThat(rdbmsConfiguration.getWhereCondition(), is(""));
- rdbmsConfiguration.setWhereCondition("WHERE 1=1");
- assertThat(rdbmsConfiguration.getWhereCondition(), is("WHERE 1=1"));
+ assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is(""));
+ rdbmsConfiguration.setPrimaryKey("id");
+ rdbmsConfiguration.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 10)));
+ assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 5355fc8..abd4069 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -24,17 +24,19 @@ import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -43,7 +45,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.sql.DataSource;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -143,12 +144,12 @@ public final class AbstractJDBCImporterTest {
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
- result.add(new FinishedRecord(new NopLogPosition()));
+ result.add(new FinishedRecord(new NopPosition()));
return result;
}
private DataRecord getDataRecord(final String recordType) {
- DataRecord result = new DataRecord(new NopLogPosition(), 3);
+ DataRecord result = new DataRecord(new NopPosition(), 3);
result.setTableName(TABLE_NAME);
result.setType(recordType);
result.addColumn(new Column("id", 1, false, true));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index 22f9ea8..3419d46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.junit.Before;
@@ -89,7 +89,7 @@ public class AbstractSqlBuilderTest {
}
private DataRecord mockDataRecord(final String tableName) {
- DataRecord result = new DataRecord(new NopLogPosition(), 4);
+ DataRecord result = new DataRecord(new NopPosition(), 4);
result.setTableName(tableName);
result.addColumn(new Column("id", "", false, true));
result.addColumn(new Column("sc", "", false, false));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 5fd7b4c..19f1f95 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.fixture;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -37,7 +37,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends LogPositionManager> getLogPositionManager() {
+ public Class<? extends PositionManager> getPositionManager() {
return null;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
new file mode 100644
index 0000000..34a679e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
+
+import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
+import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.resume.ResumablePositionManagerFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public final class SyncPositionResumerTest {
+
+ private static final String DATA_SOURCE_URL = "jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
+
+ private static final String USERNAME = "root";
+
+ private static final String PASSWORD = "password";
+
+ private ShardingScalingJob shardingScalingJob;
+
+ private ResumablePositionManager resumablePositionManager;
+
+ private SyncPositionResumer syncPositionResumer;
+
+ @Before
+ public void setUp() {
+ ScalingContext.getInstance().init(new ServerConfiguration());
+ shardingScalingJob = new ShardingScalingJob("scalingTest", 0);
+ shardingScalingJob.getSyncConfigurations().add(mockSyncConfiguration());
+ resumablePositionManager = ResumablePositionManagerFactory.newInstance("MySQL", "/scalingTest/item-0");
+ syncPositionResumer = new SyncPositionResumer();
+ }
+
+ @Test
+ public void assertResumePosition() {
+ resumablePositionManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+ resumablePositionManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
+ syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumablePositionManager);
+ assertEquals(shardingScalingJob.getIncrementalDataTasks().size(), 1);
+ assertEquals(shardingScalingJob.getInventoryDataTasks().size(), 3);
+ }
+
+ @Test
+ public void assertPersistPosition() {
+ ResumablePositionManager resumablePositionManager = mock(ResumablePositionManager.class);
+ syncPositionResumer.persistPosition(shardingScalingJob, resumablePositionManager);
+ verify(resumablePositionManager).persistIncrementalPosition();
+ verify(resumablePositionManager).persistInventoryPosition();
+ }
+
+ private PositionManager mockPositionManager() {
+ return new PositionManager() {
+ @Override
+ public Position getCurrentPosition() {
+ return null;
+ }
+
+ @Override
+ public void updateCurrentPosition(final Position newPosition) {
+
+ }
+ };
+ }
+
+ private SyncConfiguration mockSyncConfiguration() {
+ RdbmsConfiguration dumperConfig = mockDumperConfig();
+ RdbmsConfiguration importerConfig = new RdbmsConfiguration();
+ Map<String, String> tableMap = new HashMap<>();
+ tableMap.put("t_order", "t_order");
+ return new SyncConfiguration(3, tableMap,
+ dumperConfig, importerConfig);
+ }
+
+ private RdbmsConfiguration mockDumperConfig() {
+ DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
+ RdbmsConfiguration result = new RdbmsConfiguration();
+ result.setDataSourceName("ds0");
+ result.setDataSourceConfiguration(dataSourceConfiguration);
+ return result;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
index deae566..87d34fb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory;
-import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.junit.After;
import org.junit.Before;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 517444c..a1ffd8c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -25,6 +25,8 @@ import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -94,6 +96,7 @@ public final class InventoryDataScalingTaskTest {
RdbmsConfiguration result = new RdbmsConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
result.setTableName("t_order");
+ result.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(1, 100)));
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
index 5599dc6..3549079 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/BinlogPosition.java
@@ -17,7 +17,11 @@
package org.apache.shardingsphere.scaling.mysql;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.Expose;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import lombok.AllArgsConstructor;
import lombok.Getter;
@@ -27,16 +31,20 @@ import lombok.Setter;
/**
* Binlog Position.
*/
-@Getter
-@Setter
-@RequiredArgsConstructor
@AllArgsConstructor
-public class BinlogPosition implements LogPosition<BinlogPosition> {
+@RequiredArgsConstructor
+@Setter
+@Getter
+public class BinlogPosition implements Position<BinlogPosition> {
private static final long serialVersionUID = -4917415481787093677L;
+ private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
+
+ @Expose
private final String filename;
+ @Expose
private final long position;
private long serverId;
@@ -54,4 +62,9 @@ public class BinlogPosition implements LogPosition<BinlogPosition> {
private long toLong() {
return Long.parseLong(filename.substring(filename.lastIndexOf(".") + 1)) << 32 | position;
}
+
+ @Override
+ public JsonElement toJson() {
+ return GSON.toJsonTree(this);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index b1bb0e4..aaadbba 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -22,9 +22,9 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
-import org.apache.shardingsphere.scaling.core.job.position.NopLogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
@@ -65,7 +65,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
@Setter
private Channel channel;
- public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition binlogPosition) {
+ public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position binlogPosition) {
this.binlogPosition = (BinlogPosition) binlogPosition;
if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("MySQLBinlogDumper only support JDBCDataSourceConfiguration");
@@ -93,7 +93,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor imp
handleEvent(channel, uri, event);
}
}
- pushRecord(channel, new FinishedRecord(new NopLogPosition()));
+ pushRecord(channel, new FinishedRecord(new NopPosition()));
}
private void handleEvent(final Channel channel, final JdbcUri uri, final AbstractBinlogEvent event) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index c34538c..521618c 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
/**
* MySQL importer.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
similarity index 74%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
index 666d798..c55859c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.mysql;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import com.google.gson.Gson;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -27,15 +27,24 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * MySQL log manager, based on binlog mechanism.
+ * MySQL position manager, based on binlog mechanism.
*/
-@RequiredArgsConstructor
-public final class MySQLLogPositionManager implements LogPositionManager<BinlogPosition> {
+public final class MySQLPositionManager implements PositionManager<BinlogPosition> {
- private final DataSource dataSource;
+ private static final Gson GSON = new Gson();
+
+ private DataSource dataSource;
private BinlogPosition currentPosition;
+ public MySQLPositionManager(final DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public MySQLPositionManager(final String position) {
+ currentPosition = GSON.fromJson(position, BinlogPosition.class);
+ }
+
@Override
public BinlogPosition getCurrentPosition() {
if (null == currentPosition) {
@@ -60,7 +69,7 @@ public final class MySQLLogPositionManager implements LogPositionManager<BinlogP
}
@Override
- public void updateCurrentPosition(final BinlogPosition newLogPosition) {
- this.currentPosition = newLogPosition;
+ public void updateCurrentPosition(final BinlogPosition newPosition) {
+ this.currentPosition = newPosition;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 51e2959..29dfb1b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.mysql;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -40,8 +40,8 @@ public final class MySQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends LogPositionManager> getLogPositionManager() {
- return MySQLLogPositionManager.class;
+ public Class<? extends PositionManager> getPositionManager() {
+ return MySQLPositionManager.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
new file mode 100644
index 0000000..33dd347
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumablePositionManagerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.job.position.resume;
+
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+import org.apache.shardingsphere.scaling.mysql.MySQLPositionManager;
+import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
+import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public final class AbstractResumablePositionManagerTest {
+
+ private AbstractResumablePositionManager resumablePositionManager;
+
+ private final String incrementalPosition = "{\"ds0\":{\"filename\":\"mysql-bin.000001\",\"position\":4},\"ds1\":{\"filename\":\"mysql-bin.000002\",\"position\":4}}";
+
+ private final String inventoryPosition = "{\"unfinish\":{\"ds1.t_order_1#0\":[0,200],\"ds0.t_order_1#0\":[0,100]},\"finished\":[\"ds0.t_order_1#1\"]}";
+
+ @Before
+ public void setUp() throws Exception {
+ resumablePositionManager = new AbstractResumablePositionManager() {
+ };
+ resumablePositionManager.setDatabaseType("MySQL");
+ resumablePositionManager.setTaskPath("/scalingTest/item-0");
+ ReflectionUtil.getFieldValueFromClass(new ScalingEntryLoader(), "SCALING_ENTRY_MAP", Map.class).put("MySQL", new MySQLScalingEntry());
+ }
+
+ @Test
+ public void assertResumeIncrementalPosition() {
+ resumablePositionManager.resumeIncrementalPosition(incrementalPosition);
+ assertEquals(resumablePositionManager.getIncrementalPositionManagerMap().size(), 2);
+ }
+
+ @Test
+ public void assertResumeInventoryPosition() {
+ resumablePositionManager.resumeInventoryPosition(inventoryPosition);
+ assertEquals(resumablePositionManager.getInventoryPositionManagerMap().size(), 3);
+ }
+
+ @Test
+ public void assertGetIncrementalPositionData() {
+ resumablePositionManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
+ resumablePositionManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
+ assertEquals(resumablePositionManager.getIncrementalPositionData(), incrementalPosition);
+ }
+
+ @Test
+ public void assertGetInventoryPositionData() {
+ resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+ resumablePositionManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+ resumablePositionManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 200)));
+ assertEquals(resumablePositionManager.getInventoryPositionData(), inventoryPosition);
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
similarity index 88%
rename from shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 92d13dd..9b0c753 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLLogPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class MySQLLogPositionManagerTest {
+public final class MySQLPositionManagerTest {
private static final String LOG_FILE_NAME = "binlog-000001";
@@ -60,8 +60,8 @@ public final class MySQLLogPositionManagerTest {
@Test
public void assertGetCurrentPosition() {
- MySQLLogPositionManager mysqlLogManager = new MySQLLogPositionManager(dataSource);
- BinlogPosition actual = mysqlLogManager.getCurrentPosition();
+ MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
+ BinlogPosition actual = mysqlPositionManager.getCurrentPosition();
assertThat(actual.getServerId(), is(SERVER_ID));
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
@@ -69,10 +69,10 @@ public final class MySQLLogPositionManagerTest {
@Test
public void assertUpdateCurrentPosition() {
- MySQLLogPositionManager mysqlLogManager = new MySQLLogPositionManager(dataSource);
+ MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID);
- mysqlLogManager.updateCurrentPosition(expected);
- assertThat(mysqlLogManager.getCurrentPosition(), is(expected));
+ mysqlPositionManager.updateCurrentPosition(expected);
+ assertThat(mysqlPositionManager.getCurrentPosition(), is(expected));
}
private PreparedStatement mockPositionStatement() throws SQLException {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 7e85a70..5995228 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.scaling.postgresql;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
/**
* postgreSQL importer.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
similarity index 84%
rename from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 4f025fb..df3193f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -17,8 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PSQLException;
@@ -29,10 +28,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
/**
- * PostgreSQL log position manager.
+ * PostgreSQL position manager.
*/
-@RequiredArgsConstructor
-public final class PostgreSQLLogPositionManager implements LogPositionManager<WalPosition> {
+public final class PostgreSQLPositionManager implements PositionManager<WalPosition> {
public static final String SLOT_NAME = "sharding_scaling";
@@ -40,10 +38,18 @@ public final class PostgreSQLLogPositionManager implements LogPositionManager<Wa
public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
- private final DataSource dataSource;
+ private DataSource dataSource;
private WalPosition currentPosition;
+ public PostgreSQLPositionManager(final DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public PostgreSQLPositionManager(final String position) {
+ this.currentPosition = new WalPosition(LogSequenceNumber.valueOf(position));
+ }
+
@Override
public WalPosition getCurrentPosition() {
if (null == currentPosition) {
@@ -89,7 +95,7 @@ public final class PostgreSQLLogPositionManager implements LogPositionManager<Wa
}
@Override
- public void updateCurrentPosition(final WalPosition newLogPosition) {
- currentPosition = newLogPosition;
+ public void updateCurrentPosition(final WalPosition newPosition) {
+ currentPosition = newPosition;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index e250552..e70aaba 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.postgresql;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
-import org.apache.shardingsphere.scaling.core.job.position.LogPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
@@ -40,8 +40,8 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends LogPositionManager> getLogPositionManager() {
- return PostgreSQLLogPositionManager.class;
+ public Class<? extends PositionManager> getPositionManager() {
+ return PostgreSQLPositionManager.class;
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index eb5f7ef..4811a9a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
@@ -58,8 +58,8 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor i
@Setter
private Channel channel;
- public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final LogPosition logPosition) {
- walPosition = (WalPosition) logPosition;
+ public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
+ walPosition = (WalPosition) position;
if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
}
@@ -79,7 +79,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor i
PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration());
decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
- PostgreSQLLogPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
+ PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
while (isRunning()) {
ByteBuffer msg = stream.readPending();
if (msg == null) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
index 8fdc1ba..39eabec 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/WalPosition.java
@@ -17,20 +17,24 @@
package org.apache.shardingsphere.scaling.postgresql;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.LogPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.postgresql.replication.LogSequenceNumber;
/**
* PostgreSQL wal position.
*/
-@Getter
@RequiredArgsConstructor
-public class WalPosition implements LogPosition<WalPosition> {
+@Getter
+public class WalPosition implements Position<WalPosition> {
private static final long serialVersionUID = -3498484556749679001L;
+ private static final Gson GSON = new Gson();
+
private final LogSequenceNumber logSequenceNumber;
@Override
@@ -42,4 +46,9 @@ public class WalPosition implements LogPosition<WalPosition> {
long o2 = walPosition.getLogSequenceNumber().asLong();
return Long.compare(o1, o2);
}
+
+ @Override
+ public JsonElement toJson() {
+ return GSON.toJsonTree(logSequenceNumber.asLong());
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
similarity index 84%
rename from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index 7632b03..2e6356b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLLogPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class PostgreSQLLogPositionManagerTest {
+public final class PostgreSQLPositionManagerTest {
private static final String POSTGRESQL_96_LSN = "0/14EFDB8";
@@ -66,35 +66,35 @@ public final class PostgreSQLLogPositionManagerTest {
@Test
public void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
- PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+ PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- WalPosition actual = postgreSQLLogManager.getCurrentPosition();
+ WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}
@Test
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
- PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+ PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
- WalPosition actual = postgreSQLLogManager.getCurrentPosition();
+ WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}
@Test(expected = RuntimeException.class)
public void assertGetCurrentPositionThrowException() throws SQLException {
- PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+ PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
- postgreSQLLogManager.getCurrentPosition();
+ postgreSQLPositionManager.getCurrentPosition();
}
@Test
public void assertUpdateCurrentPosition() {
- PostgreSQLLogPositionManager postgreSQLLogManager = new PostgreSQLLogPositionManager(dataSource);
+ PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
- postgreSQLLogManager.updateCurrentPosition(expected);
- assertThat(postgreSQLLogManager.getCurrentPosition(), is(expected));
+ postgreSQLPositionManager.updateCurrentPosition(expected);
+ assertThat(postgreSQLPositionManager.getCurrentPosition(), is(expected));
}
private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {