You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/06 15:46:36 UTC
[shardingsphere] branch master updated: add generics (#6678)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new acc64bd add generics (#6678)
acc64bd is described below
commit acc64bdf0b9b994ca465c2a63a06e6db8221935c
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Aug 6 23:46:22 2020 +0800
add generics (#6678)
* add generics to GSON.fromJson
* add generics.
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/fixture/FixtureH2ScalingEntry.java | 3 +-
.../scaling/fixture/FixtureNopManager.java | 22 ++++++++--
.../scaling/core/config/RdbmsConfiguration.java | 1 +
.../core/config/utils/RdbmsConfigurationUtil.java | 1 +
.../executor/AbstractShardingScalingExecutor.java | 5 ++-
.../executor/dumper/AbstractJDBCDumper.java | 15 ++++---
.../executor/importer/AbstractJDBCImporter.java | 3 +-
.../scaling/core/job/ShardingScalingJob.java | 6 ++-
...yPositionManager.java => FinishedPosition.java} | 17 ++++++--
...sitionManager.java => IncrementalPosition.java} | 8 +---
...PositionManager.java => InventoryPosition.java} | 8 +---
...nManager.java => InventoryPositionManager.java} | 6 +--
...sitionManager.java => PlaceholderPosition.java} | 17 ++++++--
.../core/job/position/PositionManagerFactory.java | 10 +++--
.../core/job/position/PrimaryKeyPosition.java | 36 +---------------
.../resume/AbstractResumeBreakPointManager.java | 48 +++++++++++-----------
.../position/resume/ResumeBreakPointManager.java | 8 ++--
.../job/position/utils/InventoryPositionUtil.java} | 43 +++++++++----------
.../job/preparer/ShardingScalingJobPreparer.java | 11 ++---
.../job/preparer/resumer/SyncPositionResumer.java | 29 ++++++-------
.../splitter/InventoryDataTaskSplitter.java | 14 ++++---
.../core/job/preparer/utils/JobPrepareUtil.java | 3 +-
.../core/job/task/DefaultSyncTaskFactory.java | 3 +-
.../scaling/core/job/task/ScalingTask.java | 5 ++-
.../scaling/core/job/task/SyncTaskFactory.java | 3 +-
.../incremental/IncrementalDataScalingTask.java | 8 +++-
.../task/inventory/InventoryDataScalingTask.java | 11 +++--
.../inventory/InventoryDataScalingTaskGroup.java | 13 +++---
.../scaling/core/spi/ScalingEntry.java | 3 +-
.../core/config/RdbmsConfigurationTest.java | 11 +++--
.../core/fixture/FixtureH2ScalingEntry.java | 3 +-
.../preparer/resumer/SyncPositionResumerTest.java | 4 +-
.../splitter/InventoryDataTaskSplitterTest.java | 9 ++--
.../inventory/InventoryDataScalingTaskTest.java | 4 +-
.../scaling/mysql/MySQLBinlogDumper.java | 2 +-
.../scaling/mysql/MySQLScalingEntry.java | 3 +-
.../scaling/mysql/binlog/BinlogPosition.java | 6 +--
.../AbstractResumeBreakPointManagerTest.java | 12 +++---
.../scaling/postgresql/PostgreSQLScalingEntry.java | 3 +-
.../scaling/postgresql/PostgreSQLWalDumper.java | 2 +-
.../scaling/postgresql/wal/WalPosition.java | 3 +-
41 files changed, 219 insertions(+), 203 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index a322495..fa25298 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.fixture;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
@@ -37,7 +38,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
+ public Class<? extends PositionManager<IncrementalPosition>> getPositionManager() {
return FixtureNopManager.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 60f0e46..42d47cb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -17,20 +17,34 @@
package org.apache.shardingsphere.scaling.fixture;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import javax.sql.DataSource;
@RequiredArgsConstructor
-public final class FixtureNopManager extends BasePositionManager<NopPosition> implements PositionManager<NopPosition> {
+public final class FixtureNopManager extends BasePositionManager<IncrementalPosition> implements PositionManager<IncrementalPosition> {
private final DataSource dataSource;
@Override
- public NopPosition getPosition() {
- return new NopPosition();
+ public IncrementalPosition getPosition() {
+
+ return new IncrementalPosition() {
+ @Override
+ public int compareTo(final Position o) {
+ return 0;
+ }
+
+ @Override
+ public JsonElement toJson() {
+ return new JsonObject();
+ }
+ };
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
index 4468982..bb9549e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
@@ -44,6 +44,7 @@ public final class RdbmsConfiguration implements Cloneable {
private String primaryKey;
+ @SuppressWarnings("rawtypes")
private PositionManager positionManager;
private Integer spiltNum;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
index 9d9b97d..f338b4f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -35,6 +35,7 @@ public final class RdbmsConfigurationUtil {
* @param rdbmsConfiguration rdbms configuration
* @return SQL where condition
*/
+ @SuppressWarnings("unchecked")
public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) {
return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
index 5c39a98..e94fa58 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/AbstractShardingScalingExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
/**
@@ -27,7 +28,7 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
*/
@Setter
@Getter
-public abstract class AbstractShardingScalingExecutor implements ShardingScalingExecutor {
+public abstract class AbstractShardingScalingExecutor<T extends Position> implements ShardingScalingExecutor {
@Setter(AccessLevel.PROTECTED)
@Getter(AccessLevel.PROTECTED)
@@ -35,7 +36,7 @@ public abstract class AbstractShardingScalingExecutor implements ShardingScaling
private String taskId;
- private PositionManager positionManager;
+ private PositionManager<T> positionManager;
@Override
public void start() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index 1f88348..6da217a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
@@ -31,8 +32,10 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
@@ -47,7 +50,7 @@ import java.sql.SQLException;
* Abstract JDBC dumper implement.
*/
@Slf4j
-public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor implements JDBCDumper {
+public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor<InventoryPosition> implements JDBCDumper {
@Getter(AccessLevel.PROTECTED)
private final RdbmsConfiguration rdbmsConfiguration;
@@ -87,7 +90,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (isRunning() && rs.next()) {
- DataRecord record = new DataRecord(newPrimaryKeyPosition(rs), metaData.getColumnCount());
+ DataRecord record = new DataRecord(newInventoryPosition(rs), metaData.getColumnCount());
record.setType("BOOTSTRAP-INSERT");
record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
@@ -95,7 +98,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
}
pushRecord(record);
}
- pushRecord(new FinishedRecord(new PrimaryKeyPosition.FinishedPosition()));
+ pushRecord(new FinishedRecord(new FinishedPosition()));
} catch (final SQLException ex) {
stop();
channel.close();
@@ -105,9 +108,9 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
}
}
- private PrimaryKeyPosition newPrimaryKeyPosition(final ResultSet rs) throws SQLException {
+ private InventoryPosition newInventoryPosition(final ResultSet rs) throws SQLException {
if (null == rdbmsConfiguration.getPrimaryKey()) {
- return new PrimaryKeyPosition.PlaceholderPosition();
+ return new PlaceholderPosition();
}
return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index cd268df..4863b8f 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -43,7 +44,7 @@ import java.util.List;
* Abstract JDBC importer implementation.
*/
@Slf4j
-public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor implements Importer {
+public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
private final RdbmsConfiguration rdbmsConfiguration;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
index 2d03acc..aec61ee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/ShardingScalingJob.java
@@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.LinkedList;
@@ -42,9 +44,9 @@ public final class ShardingScalingJob {
private final transient List<SyncConfiguration> syncConfigurations = new LinkedList<>();
- private final transient List<ScalingTask> inventoryDataTasks = new LinkedList<>();
+ private final transient List<ScalingTask<InventoryPosition>> inventoryDataTasks = new LinkedList<>();
- private final transient List<ScalingTask> incrementalDataTasks = new LinkedList<>();
+ private final transient List<ScalingTask<IncrementalPosition>> incrementalDataTasks = new LinkedList<>();
private final String jobName;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
similarity index 72%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
index 67e411f..212f168 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/FinishedPosition.java
@@ -17,12 +17,21 @@
package org.apache.shardingsphere.scaling.core.job.position;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
/**
- * Primary key position manager.
+ * Finished position.
*/
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class FinishedPosition implements InventoryPosition {
+
+ @Override
+ public JsonElement toJson() {
+ return new JsonObject();
+ }
- public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
- super(position);
+ @Override
+ public int compareTo(final Position o) {
+ return 0;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
similarity index 75%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
index 67e411f..b1048cb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/IncrementalPosition.java
@@ -18,11 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.position;
/**
- * Primary key position manager.
+ * Incremental position interface.
*/
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
-
- public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
- super(position);
- }
+public interface IncrementalPosition extends Position {
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
similarity index 75%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
index 67e411f..aa956a7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPosition.java
@@ -18,11 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.position;
/**
- * Primary key position manager.
+ * Inventory position interface.
*/
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
-
- public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
- super(position);
- }
+public interface InventoryPosition extends Position {
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
similarity index 78%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
index 67e411f..dc54746 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/InventoryPositionManager.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.scaling.core.job.position;
/**
- * Primary key position manager.
+ * Inventory position manager.
*/
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class InventoryPositionManager<T extends InventoryPosition> extends BasePositionManager<T> implements PositionManager<T> {
- public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
+ public InventoryPositionManager(final T position) {
super(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
similarity index 72%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
index 67e411f..416f594 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PlaceholderPosition.java
@@ -17,12 +17,21 @@
package org.apache.shardingsphere.scaling.core.job.position;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
/**
- * Primary key position manager.
+ * Placeholder position.
*/
-public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
+public final class PlaceholderPosition implements InventoryPosition {
+
+ @Override
+ public JsonElement toJson() {
+ return new JsonArray();
+ }
- public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
- super(position);
+ @Override
+ public int compareTo(final Position o) {
+ return 0;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
index fe0ed84..5466876 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManagerFactory.java
@@ -38,10 +38,11 @@ public final class PositionManagerFactory {
* @param dataSource data source
* @return position manager
*/
+ @SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
- public static PositionManager newInstance(final String databaseType, final DataSource dataSource) {
+ public static PositionManager<IncrementalPosition> newInstance(final String databaseType, final DataSource dataSource) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
+ return (PositionManager<IncrementalPosition>) scalingEntry.getPositionManager().getConstructor(DataSource.class).newInstance(dataSource);
}
/**
@@ -51,9 +52,10 @@ public final class PositionManagerFactory {
* @param position position
* @return position manager
*/
+ @SuppressWarnings("unchecked")
@SneakyThrows(ReflectiveOperationException.class)
- public static PositionManager newInstance(final String databaseType, final String position) {
+ public static PositionManager<IncrementalPosition> newInstance(final String databaseType, final String position) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
+ return (PositionManager<IncrementalPosition>) scalingEntry.getPositionManager().getConstructor(String.class).newInstance(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 8ac8736..d3eee8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -24,8 +24,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
-import java.util.List;
-
/**
* Use primary key as position.
*/
@@ -33,7 +31,7 @@ import java.util.List;
@AllArgsConstructor
@Getter
@Setter
-public class PrimaryKeyPosition implements Position {
+public class PrimaryKeyPosition implements InventoryPosition {
private static final Gson GSON = new Gson();
@@ -49,40 +47,8 @@ public class PrimaryKeyPosition implements Position {
return Long.compare(beginValue, ((PrimaryKeyPosition) position).beginValue);
}
- /**
- * Transform primary key position from json to object.
- *
- * @param json json data
- * @return primary key position
- */
- @SuppressWarnings("unchecked")
- public static PrimaryKeyPosition fromJson(final String json) {
- List<Double> values = GSON.fromJson(json, List.class);
- if (2 == values.size()) {
- return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
- }
- return new PlaceholderPosition();
- }
-
@Override
public JsonElement toJson() {
return GSON.toJsonTree(new long[]{beginValue, endValue});
}
-
- /**
- * Finish flag position for inventory task finished.
- */
- public static class FinishedPosition extends PrimaryKeyPosition {
- }
-
- /**
- * Placeholder position for without primary key table.
- */
- public static class PlaceholderPosition extends PrimaryKeyPosition {
-
- @Override
- public JsonElement toJson() {
- return GSON.toJsonTree(new long[0]);
- }
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index d5aa51b..206285a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,11 +26,13 @@ import com.google.gson.JsonParser;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.utils.InventoryPositionUtil;
import java.io.Closeable;
import java.util.Map;
@@ -52,9 +54,9 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
private static final String FINISHED = "finished";
- private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
+ private final Map<String, PositionManager<InventoryPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
- private final Map<String, PositionManager<Position>> incrementalPositionManagerMap = Maps.newConcurrentMap();
+ private final Map<String, PositionManager<IncrementalPosition>> incrementalPositionManagerMap = Maps.newConcurrentMap();
private boolean resumable;
@@ -75,23 +77,22 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
return;
}
log.info("resume inventory position from {} = {}", taskPath, data);
- InventoryPosition inventoryPosition = InventoryPosition.fromJson(data);
- Map<String, PrimaryKeyPosition> unfinished = inventoryPosition.getUnfinished();
- for (Entry<String, PrimaryKeyPosition> entry : unfinished.entrySet()) {
- inventoryPositionManagerMap.put(entry.getKey(), new PrimaryKeyPositionManager(entry.getValue()));
+ InventoryPositions inventoryPositions = InventoryPositions.fromJson(data);
+ Map<String, InventoryPosition> unfinished = inventoryPositions.getUnfinished();
+ for (Entry<String, InventoryPosition> entry : unfinished.entrySet()) {
+ inventoryPositionManagerMap.put(entry.getKey(), new InventoryPositionManager<>(entry.getValue()));
}
- for (String each : inventoryPosition.getFinished()) {
- inventoryPositionManagerMap.put(each, new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
+ for (String each : inventoryPositions.getFinished()) {
+ inventoryPositionManagerMap.put(each, new InventoryPositionManager<>(new FinishedPosition()));
}
}
- @SuppressWarnings("unchecked")
protected void resumeIncrementalPosition(final String data) {
if (Strings.isNullOrEmpty(data)) {
return;
}
log.info("resume incremental position from {} = {}", taskPath, data);
- Map<String, Object> incrementalPosition = GSON.fromJson(data, Map.class);
+ Map<String, Object> incrementalPosition = GSON.<Map<String, Object>>fromJson(data, Map.class);
for (Entry<String, Object> entry : incrementalPosition.entrySet()) {
getIncrementalPositionManagerMap().put(entry.getKey(), PositionManagerFactory.newInstance(databaseType, entry.getValue().toString()));
}
@@ -101,8 +102,8 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
JsonObject result = new JsonObject();
JsonObject unfinished = new JsonObject();
Set<String> finished = Sets.newHashSet();
- for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : inventoryPositionManagerMap.entrySet()) {
- if (entry.getValue().getPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+ for (Entry<String, PositionManager<InventoryPosition>> entry : inventoryPositionManagerMap.entrySet()) {
+ if (entry.getValue().getPosition() instanceof FinishedPosition) {
finished.add(entry.getKey());
continue;
}
@@ -115,7 +116,7 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
protected String getIncrementalPositionData() {
JsonObject result = new JsonObject();
- for (Entry<String, PositionManager<Position>> entry : incrementalPositionManagerMap.entrySet()) {
+ for (Entry<String, PositionManager<IncrementalPosition>> entry : incrementalPositionManagerMap.entrySet()) {
result.add(entry.getKey(), entry.getValue().getPosition().toJson());
}
return result.toString();
@@ -127,9 +128,9 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
@Getter
@Setter
- private static final class InventoryPosition {
+ private static final class InventoryPositions {
- private Map<String, PrimaryKeyPosition> unfinished;
+ private Map<String, InventoryPosition> unfinished;
private Set<String> finished;
@@ -139,13 +140,12 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
* @param data json data
* @return inventory position
*/
- @SuppressWarnings("unchecked")
- public static InventoryPosition fromJson(final String data) {
- InventoryPosition result = new InventoryPosition();
+ public static InventoryPositions fromJson(final String data) {
+ InventoryPositions result = new InventoryPositions();
JsonObject json = JsonParser.parseString(data).getAsJsonObject();
- Map<String, Object> unfinished = GSON.fromJson(json.getAsJsonObject(UNFINISHED), Map.class);
- result.setUnfinished(unfinished.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> PrimaryKeyPosition.fromJson(entry.getValue().toString()))));
- result.setFinished(GSON.fromJson(json.getAsJsonArray(FINISHED), Set.class));
+ Map<String, Object> unfinished = GSON.<Map<String, Object>>fromJson(json.getAsJsonObject(UNFINISHED), Map.class);
+ result.setUnfinished(unfinished.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> InventoryPositionUtil.fromJson(entry.getValue().toString()))));
+ result.setFinished(GSON.<Set<String>>fromJson(json.getAsJsonArray(FINISHED), Set.class));
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
index c33e90c..65b9acd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import java.util.Map;
@@ -40,14 +40,14 @@ public interface ResumeBreakPointManager {
*
* @return inventory position map
*/
- Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionManagerMap();
+ Map<String, PositionManager<InventoryPosition>> getInventoryPositionManagerMap();
/**
* Get incremental position map.
*
* @return incremental position map
*/
- Map<String, PositionManager<Position>> getIncrementalPositionManagerMap();
+ Map<String, PositionManager<IncrementalPosition>> getIncrementalPositionManagerMap();
/**
* Persist inventory position.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
similarity index 51%
copy from shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
index 9b56018..0c6bbae 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/utils/InventoryPositionUtil.java
@@ -15,38 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql.wal;
+package org.apache.shardingsphere.scaling.core.job.position.utils;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.postgresql.replication.LogSequenceNumber;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+
+import java.util.List;
/**
- * PostgreSQL wal position.
+ * Inventory position util.
*/
-@RequiredArgsConstructor
-@Getter
-public final class WalPosition implements Position {
+public final class InventoryPositionUtil {
private static final Gson GSON = new Gson();
- private final LogSequenceNumber logSequenceNumber;
-
- @Override
- public int compareTo(final Position position) {
- if (null == position) {
- return 1;
+ /**
+ * Transform primary key position from json to object.
+ *
+ * @param json json data
+ * @return primary key position
+ */
+ public static InventoryPosition fromJson(final String json) {
+ List<Double> values = GSON.<List<Double>>fromJson(json, List.class);
+ if (2 == values.size()) {
+ return new PrimaryKeyPosition(values.get(0).longValue(), values.get(1).longValue());
}
- long o1 = logSequenceNumber.asLong();
- long o2 = ((WalPosition) position).logSequenceNumber.asLong();
- return Long.compare(o1, o2);
- }
-
- @Override
- public JsonElement toJson() {
- return GSON.toJsonTree(logSequenceNumber.asLong());
+ return new PlaceholderPosition();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 5262f8e..e2b4676 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -23,6 +23,8 @@ import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
@@ -89,11 +91,11 @@ public final class ShardingScalingJobPreparer {
}
private void initInventoryDataTasks(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager) {
- List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
+ List<ScalingTask<InventoryPosition>> allInventoryDataTasks = new LinkedList<>();
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each, dataSourceManager));
}
- for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+ for (Collection<ScalingTask<InventoryPosition>> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
}
}
@@ -106,9 +108,8 @@ public final class ShardingScalingJobPreparer {
}
}
- @SuppressWarnings("rawtypes")
- private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
- PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
+ private PositionManager<? extends IncrementalPosition> instancePositionManager(final String databaseType, final DataSource dataSource) {
+ PositionManager<? extends IncrementalPosition> positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
positionManager.getPosition();
return positionManager;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index 18361a5..9ed0da7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -21,8 +21,9 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.preparer.utils.JobPrepareUtil;
import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
@@ -59,25 +60,25 @@ public final class SyncPositionResumer {
}
private void resumeInventoryPosition(final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
- List<ScalingTask> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
- for (Collection<ScalingTask> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
+ List<ScalingTask<InventoryPosition>> allInventoryDataTasks = getAllInventoryDataTasks(shardingScalingJob, dataSourceManager, resumeBreakPointManager);
+ for (Collection<ScalingTask<InventoryPosition>> each : JobPrepareUtil.groupInventoryDataTasks(shardingScalingJob.getSyncConfigurations().get(0).getConcurrency(), allInventoryDataTasks)) {
shardingScalingJob.getInventoryDataTasks().add(syncTaskFactory.createInventoryDataSyncTaskGroup(each));
}
}
- private List<ScalingTask> getAllInventoryDataTasks(
+ private List<ScalingTask<InventoryPosition>> getAllInventoryDataTasks(
final ShardingScalingJob shardingScalingJob, final DataSourceManager dataSourceManager, final ResumeBreakPointManager resumeBreakPointManager) {
- List<ScalingTask> result = new LinkedList<>();
+ List<ScalingTask<InventoryPosition>> result = new LinkedList<>();
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
- for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
+ for (Entry<String, PositionManager<InventoryPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
}
}
return result;
}
- private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<PrimaryKeyPosition>> entry) {
+ private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<InventoryPosition>> entry) {
String[] splitTable = entry.getKey().split("#");
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
@@ -90,7 +91,7 @@ public final class SyncPositionResumer {
splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
}
- private Map<String, PositionManager<PrimaryKeyPosition>> getInventoryPositionMap(
+ private Map<String, PositionManager<InventoryPosition>> getInventoryPositionMap(
final RdbmsConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
@@ -116,8 +117,8 @@ public final class SyncPositionResumer {
persistInventoryPosition(shardingScalingJob.getInventoryDataTasks(), resumeBreakPointManager);
}
- private void persistInventoryPosition(final List<ScalingTask> inventoryDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
- for (ScalingTask each : inventoryDataTasks) {
+ private void persistInventoryPosition(final List<ScalingTask<InventoryPosition>> inventoryDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+ for (ScalingTask<InventoryPosition> each : inventoryDataTasks) {
if (each instanceof InventoryDataScalingTaskGroup) {
putInventoryDataScalingTask(((InventoryDataScalingTaskGroup) each).getScalingTasks(), resumeBreakPointManager);
}
@@ -125,14 +126,14 @@ public final class SyncPositionResumer {
resumeBreakPointManager.persistInventoryPosition();
}
- private void putInventoryDataScalingTask(final Collection<ScalingTask> scalingTasks, final ResumeBreakPointManager resumeBreakPointManager) {
- for (ScalingTask each : scalingTasks) {
+ private void putInventoryDataScalingTask(final Collection<ScalingTask<InventoryPosition>> scalingTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+ for (ScalingTask<InventoryPosition> each : scalingTasks) {
resumeBreakPointManager.getInventoryPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
}
}
- private void persistIncrementalPosition(final List<ScalingTask> incrementalDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
- for (ScalingTask each : incrementalDataTasks) {
+ private void persistIncrementalPosition(final List<ScalingTask<IncrementalPosition>> incrementalDataTasks, final ResumeBreakPointManager resumeBreakPointManager) {
+ for (ScalingTask<IncrementalPosition> each : incrementalDataTasks) {
resumeBreakPointManager.getIncrementalPositionManagerMap().put(each.getTaskId(), each.getPositionManager());
}
resumeBreakPointManager.persistIncrementalPosition();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index 8930d60..bb39253 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -22,8 +22,10 @@ import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.apache.shardingsphere.scaling.core.job.task.DefaultSyncTaskFactory;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.SyncTaskFactory;
@@ -55,8 +57,8 @@ public final class InventoryDataTaskSplitter {
* @param dataSourceManager data source manager
* @return split inventory data task
*/
- public Collection<ScalingTask> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
- Collection<ScalingTask> result = new LinkedList<>();
+ public Collection<ScalingTask<InventoryPosition>> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
+ Collection<ScalingTask<InventoryPosition>> result = new LinkedList<>();
for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) {
result.add(syncTaskFactory.createInventoryDataSyncTask(each));
}
@@ -82,7 +84,7 @@ public final class InventoryDataTaskSplitter {
for (String each : syncConfiguration.getTableNameMap().keySet()) {
RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
dumperConfig.setTableName(each);
- dumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
+ dumperConfig.setPositionManager(new InventoryPositionManager<>(new PlaceholderPosition()));
result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
}
@@ -132,10 +134,10 @@ public final class InventoryDataTaskSplitter {
for (int i = 0; i < concurrency && min <= max; i++) {
RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
if (i < concurrency - 1) {
- splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, min + step)));
+ splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, min + step)));
min += step + 1;
} else {
- splitDumperConfig.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(min, max)));
+ splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, max)));
}
splitDumperConfig.setSpiltNum(i);
result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
index c6daa5e..d4d4f94 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/utils/JobPrepareUtil.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.job.preparer.utils;
import com.google.common.collect.Lists;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.List;
@@ -37,7 +38,7 @@ public final class JobPrepareUtil {
* @param allInventoryDataTasks all inventory data tasks
* @return task group list
*/
- public static List<List<ScalingTask>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask> allInventoryDataTasks) {
+ public static List<List<ScalingTask<InventoryPosition>>> groupInventoryDataTasks(final int taskNumber, final List<ScalingTask<InventoryPosition>> allInventoryDataTasks) {
return Lists.partition(allInventoryDataTasks, taskNumber);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
index f9817b2..6972d8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
@@ -30,7 +31,7 @@ import java.util.Collection;
public final class DefaultSyncTaskFactory implements SyncTaskFactory {
@Override
- public InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(final Collection<ScalingTask> inventoryDataScalingTasks) {
+ public InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(final Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks) {
return new InventoryDataScalingTaskGroup(inventoryDataScalingTasks);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
index 66ba771..39b29c2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/ScalingTask.java
@@ -19,12 +19,13 @@ package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
/**
* Sync task interface.
*/
-public interface ScalingTask extends ShardingScalingExecutor {
+public interface ScalingTask<T extends Position> extends ShardingScalingExecutor {
/**
* Get synchronize progress.
@@ -38,7 +39,7 @@ public interface ScalingTask extends ShardingScalingExecutor {
*
* @return position manager
*/
- PositionManager getPositionManager();
+ PositionManager<T> getPositionManager();
/**
* Get task id.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
index 9df782c..e77a331 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.scaling.core.job.task;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTaskGroup;
@@ -35,7 +36,7 @@ public interface SyncTaskFactory {
* @param inventoryDataScalingTasks inventory data sync tasks
* @return inventory data sync task group
*/
- InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask> inventoryDataScalingTasks);
+ InventoryDataScalingTaskGroup createInventoryDataSyncTaskGroup(Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks);
/**
* Create inventory data sync task.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 3c89c3d..8979cf6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer
import org.apache.shardingsphere.scaling.core.execute.executor.importer.ImporterFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.ArrayList;
@@ -43,7 +44,7 @@ import java.util.concurrent.Future;
* Incremental data execute task.
*/
@Slf4j
-public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor<IncrementalPosition> implements ScalingTask<IncrementalPosition> {
private final SyncConfiguration syncConfiguration;
@@ -53,6 +54,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private long delayMillisecond;
+ @SuppressWarnings("unchecked")
public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) {
this.syncConfiguration = syncConfiguration;
dataSourceManager = new DataSourceManager();
@@ -94,7 +96,9 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private void instanceChannel(final Collection<Importer> importers) {
DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
- getPositionManager().setPosition(lastHandledRecord.getPosition());
+ if (lastHandledRecord.getPosition() instanceof IncrementalPosition) {
+ getPositionManager().setPosition((IncrementalPosition) lastHandledRecord.getPosition());
+ }
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
});
dumper.setChannel(channel);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 021ba04..ee09470 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import javax.sql.DataSource;
@@ -50,7 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Table slice execute task.
*/
@Slf4j
-public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
private final SyncConfiguration syncConfiguration;
@@ -66,6 +66,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
this(syncConfiguration, new DataSourceManager());
}
+ @SuppressWarnings("unchecked")
public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
this.syncConfiguration = syncConfiguration;
this.dataSourceManager = dataSourceManager;
@@ -126,10 +127,8 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
for (Record record : records) {
if (record instanceof DataRecord) {
count++;
- } else if (record instanceof FinishedRecord) {
- if (record.getPosition() instanceof PrimaryKeyPosition) {
- getPositionManager().setPosition(record.getPosition());
- }
+ } else if (record instanceof FinishedRecord && record.getPosition() instanceof InventoryPosition) {
+ getPositionManager().setPosition((InventoryPosition) record.getPosition());
}
}
syncedRows.addAndGet(count);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
index 80eefed..bae3d2b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroup.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import java.util.Collection;
@@ -30,25 +31,25 @@ import java.util.Collection;
*/
@Slf4j
@Getter
-public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor implements ScalingTask {
+public final class InventoryDataScalingTaskGroup extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
- private final Collection<ScalingTask> scalingTasks;
+ private final Collection<ScalingTask<InventoryPosition>> scalingTasks;
- public InventoryDataScalingTaskGroup(final Collection<ScalingTask> inventoryDataScalingTasks) {
+ public InventoryDataScalingTaskGroup(final Collection<ScalingTask<InventoryPosition>> inventoryDataScalingTasks) {
scalingTasks = inventoryDataScalingTasks;
}
@Override
public void start() {
super.start();
- for (ScalingTask each : scalingTasks) {
+ for (ScalingTask<InventoryPosition> each : scalingTasks) {
each.start();
}
}
@Override
public void stop() {
- for (ScalingTask each : scalingTasks) {
+ for (ScalingTask<InventoryPosition> each : scalingTasks) {
each.stop();
}
}
@@ -56,7 +57,7 @@ public final class InventoryDataScalingTaskGroup extends AbstractShardingScaling
@Override
public SyncProgress getProgress() {
InventoryDataSyncTaskProgressGroup result = new InventoryDataSyncTaskProgressGroup();
- for (ScalingTask each : scalingTasks) {
+ for (ScalingTask<InventoryPosition> each : scalingTasks) {
result.addSyncProgress(each.getProgress());
}
return result;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 4e14486..5d3fdc6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.spi;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
@@ -48,7 +49,7 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
*
* @return position manager type
*/
- Class<? extends PositionManager> getPositionManager();
+ Class<? extends PositionManager<? extends IncrementalPosition>> getPositionManager();
/**
* Get importer type.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
index 66c0774..2b7081f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
@@ -18,14 +18,13 @@
package org.apache.shardingsphere.scaling.core.config;
import org.apache.shardingsphere.scaling.core.config.utils.RdbmsConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
public final class RdbmsConfigurationTest {
@@ -33,9 +32,9 @@ public final class RdbmsConfigurationTest {
public void assertClone() {
RdbmsConfiguration origin = new RdbmsConfiguration();
RdbmsConfiguration clone = RdbmsConfiguration.clone(origin);
- assertTrue(origin.equals(clone));
+ assertThat(clone, is(origin));
origin.setTableName("t1");
- assertFalse(origin.equals(clone));
+ assertNotSame(origin, clone);
}
@Test
@@ -43,7 +42,7 @@ public final class RdbmsConfigurationTest {
RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration();
assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is(""));
rdbmsConfiguration.setPrimaryKey("id");
- rdbmsConfiguration.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 10)));
+ rdbmsConfiguration.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(0, 10)));
assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 19f1f95..b856a2d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.fixture;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
@@ -37,7 +38,7 @@ public final class FixtureH2ScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
+ public Class<? extends PositionManager<IncrementalPosition>> getPositionManager() {
return null;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index b975366..f537ec1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManagerFactory;
import org.junit.Before;
@@ -67,7 +67,7 @@ public final class SyncPositionResumerTest {
@Test
public void assertResumePosition() {
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
+ resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new InventoryPositionManager(new PrimaryKeyPosition(0, 100)));
resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new BasePositionManager<>());
syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumeBreakPointManager);
assertThat(shardingScalingJob.getIncrementalDataTasks().size(), is(1));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index f709925..d46326a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.junit.After;
import org.junit.Before;
@@ -73,7 +74,7 @@ public class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
initIntPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+ Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(3));
}
@@ -81,7 +82,7 @@ public class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
initCharPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+ Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -89,7 +90,7 @@ public class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException {
initUnionPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+ Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -97,7 +98,7 @@ public class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
initNoPrimaryEnvironment(syncConfiguration.getDumperConfiguration());
- Collection<ScalingTask> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
+ Collection<ScalingTask<InventoryPosition>> actual = inventoryDataTaskSplitter.splitInventoryData(syncConfiguration, dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index a1ffd8c..1c0189c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -96,7 +96,7 @@ public final class InventoryDataScalingTaskTest {
RdbmsConfiguration result = new RdbmsConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
result.setTableName("t_order");
- result.setPositionManager(new PrimaryKeyPositionManager(new PrimaryKeyPosition(1, 100)));
+ result.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(1, 100)));
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index 9e002a4..4ba8fcf 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -54,7 +54,7 @@ import java.util.Random;
* MySQL binlog dumper.
*/
@Slf4j
-public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor implements LogDumper {
+public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<BinlogPosition> implements LogDumper {
private final BinlogPosition binlogPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index 29dfb1b..e00a12f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
/**
* MySQL scaling entry.
@@ -40,7 +41,7 @@ public final class MySQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
+ public Class<? extends PositionManager<BinlogPosition>> getPositionManager() {
return MySQLPositionManager.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index e59dbd1..40801c6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -21,12 +21,12 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.annotations.Expose;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
/**
* Binlog Position.
@@ -35,7 +35,7 @@ import lombok.Setter;
@RequiredArgsConstructor
@Setter
@Getter
-public class BinlogPosition implements Position {
+public class BinlogPosition implements IncrementalPosition {
private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 7315553..89eeb76 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
+import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
@@ -70,10 +72,10 @@ public final class AbstractResumeBreakPointManagerTest {
@Test
public void assertGetInventoryPositionData() {
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 100L)));
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new PrimaryKeyPositionManager(new PrimaryKeyPosition.FinishedPosition()));
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new PrimaryKeyPositionManager(new PrimaryKeyPosition.PlaceholderPosition()));
- resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0L, 200L)));
+ resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#0", new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 100L)));
+ resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_1#1", new InventoryPositionManager<>(new FinishedPosition()));
+ resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0.t_order_2", new InventoryPositionManager<>(new PlaceholderPosition()));
+ resumeBreakPointManager.getInventoryPositionManagerMap().put("ds1.t_order_1#0", new InventoryPositionManager<>(new PrimaryKeyPosition(0L, 200L)));
assertThat(resumeBreakPointManager.getInventoryPositionData(), is(inventoryPosition));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index e70aaba..8c4bc2f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
/**
* PostgreSQL scaling entry.
@@ -40,7 +41,7 @@ public final class PostgreSQLScalingEntry implements ScalingEntry {
}
@Override
- public Class<? extends PositionManager> getPositionManager() {
+ public Class<? extends PositionManager<WalPosition>> getPositionManager() {
return PostgreSQLPositionManager.class;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 3186a8a..730c703 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -43,7 +43,7 @@ import java.sql.SQLException;
/**
* PostgreSQL WAL dumper.
*/
-public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor implements LogDumper {
+public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<WalPosition> implements LogDumper {
private final WalPosition walPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 9b56018..f5c6f60 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.postgresql.replication.LogSequenceNumber;
@@ -29,7 +30,7 @@ import org.postgresql.replication.LogSequenceNumber;
*/
@RequiredArgsConstructor
@Getter
-public final class WalPosition implements Position {
+public final class WalPosition implements IncrementalPosition {
private static final Gson GSON = new Gson();