You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/06 09:07:43 UTC
[shardingsphere] branch master updated: Remove Position generics
and Serializable declare. (#6663)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ffa6760 Remove Position generics and Serializable declare. (#6663)
ffa6760 is described below
commit ffa6760b0154fae54c7e7db2dbfe4c297ac883ac
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Aug 6 17:07:30 2020 +0800
Remove Position generics and Serializable declare. (#6663)
* Remove Position generics and Serializable declare.
* Remove Position generics and Serializable declare.
* add final keyword
* add blank line.
* remove blank line.
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/fixture/FixtureNopManager.java | 9 ++--
.../core/config/utils/RdbmsConfigurationUtil.java | 2 +-
.../executor/dumper/AbstractJDBCDumper.java | 2 +-
.../{Position.java => BasePositionManager.java} | 25 ++++++-----
.../scaling/core/job/position/NopPosition.java | 6 +--
.../scaling/core/job/position/Position.java | 5 +--
.../scaling/core/job/position/PositionManager.java | 10 ++---
.../core/job/position/PrimaryKeyPosition.java | 4 +-
.../job/position/PrimaryKeyPositionManager.java | 17 ++-----
.../resume/AbstractResumeBreakPointManager.java | 11 ++---
.../position/resume/ResumeBreakPointManager.java | 3 +-
.../job/preparer/ShardingScalingJobPreparer.java | 2 +-
.../incremental/IncrementalDataScalingTask.java | 6 +--
.../task/inventory/InventoryDataScalingTask.java | 2 +-
.../preparer/resumer/SyncPositionResumerTest.java | 19 +-------
.../scaling/mysql/MySQLPositionManager.java | 47 ++++++++++---------
.../scaling/mysql/binlog/BinlogPosition.java | 10 ++---
.../AbstractResumeBreakPointManagerTest.java | 7 +--
.../scaling/mysql/MySQLPositionManagerTest.java | 6 +--
.../postgresql/PostgreSQLPositionManager.java | 52 +++++++++++-----------
.../scaling/postgresql/PostgreSQLWalDumper.java | 3 +-
.../scaling/postgresql/wal/WalPosition.java | 10 ++---
.../postgresql/PostgreSQLPositionManagerTest.java | 10 ++---
23 files changed, 118 insertions(+), 150 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
index 0a30ccc..60f0e46 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureNopManager.java
@@ -18,22 +18,19 @@
package org.apache.shardingsphere.scaling.fixture;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import javax.sql.DataSource;
@RequiredArgsConstructor
-public final class FixtureNopManager implements PositionManager<NopPosition> {
+public final class FixtureNopManager extends BasePositionManager<NopPosition> implements PositionManager<NopPosition> {
private final DataSource dataSource;
@Override
- public NopPosition getCurrentPosition() {
+ public NopPosition getPosition() {
return new NopPosition();
}
-
- @Override
- public void updateCurrentPosition(final NopPosition newPosition) {
- }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
index 074fe71..9d9b97d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/utils/RdbmsConfigurationUtil.java
@@ -43,7 +43,7 @@ public final class RdbmsConfigurationUtil {
if (null == primaryKey || null == positionManager) {
return "";
}
- PrimaryKeyPosition position = positionManager.getCurrentPosition();
+ PrimaryKeyPosition position = positionManager.getPosition();
return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, position.getBeginValue(), position.getEndValue());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index c7fa660..1f88348 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -109,7 +109,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
if (null == rdbmsConfiguration.getPrimaryKey()) {
return new PrimaryKeyPosition.PlaceholderPosition();
}
- return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getCurrentPosition()).getEndValue());
+ return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue());
}
protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
similarity index 72%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
index c6f3eb2..578232a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/BasePositionManager.java
@@ -17,20 +17,21 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import com.google.gson.JsonElement;
-
-import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
/**
- * Position interface.
+ * Base position manager.
+ *
+ * @param <T> Position
*/
-// TODO check Serializable is needed
-public interface Position<T> extends Comparable<T>, Serializable {
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+public class BasePositionManager<T extends Position> implements PositionManager<T> {
- /**
- * To json element.
- *
- * @return json element
- */
- JsonElement toJson();
+ private T position;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
index 850c276..1947edb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/NopPosition.java
@@ -23,12 +23,10 @@ import com.google.gson.JsonObject;
/**
* Nop position.
*/
-public final class NopPosition implements Position<NopPosition> {
-
- private static final long serialVersionUID = 1946907178847169020L;
+public final class NopPosition implements Position {
@Override
- public int compareTo(final NopPosition nopPosition) {
+ public int compareTo(final Position position) {
return 0;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
index c6f3eb2..2d6801d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/Position.java
@@ -19,13 +19,10 @@ package org.apache.shardingsphere.scaling.core.job.position;
import com.google.gson.JsonElement;
-import java.io.Serializable;
-
/**
* Position interface.
*/
-// TODO check Serializable is needed
-public interface Position<T> extends Comparable<T>, Serializable {
+public interface Position extends Comparable<Position> {
/**
* To json element.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
index fac5976..181c86f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionManager.java
@@ -25,16 +25,16 @@ package org.apache.shardingsphere.scaling.core.job.position;
public interface PositionManager<T extends Position> {
/**
- * Get current position.
+ * Get position.
*
* @return position
*/
- T getCurrentPosition();
+ T getPosition();
/**
- * Update currentPosition.
+ * Set Position.
*
- * @param newPosition new position.
+ * @param position position.
*/
- void updateCurrentPosition(T newPosition);
+ void setPosition(T position);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
index 717378a..8ac8736 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPosition.java
@@ -35,8 +35,6 @@ import java.util.List;
@Setter
public class PrimaryKeyPosition implements Position {
- private static final long serialVersionUID = 8101879950564531329L;
-
private static final Gson GSON = new Gson();
private long beginValue;
@@ -44,7 +42,7 @@ public class PrimaryKeyPosition implements Position {
private long endValue;
@Override
- public int compareTo(final Object position) {
+ public int compareTo(final Position position) {
if (null == position) {
return 1;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
index b101aaf..67e411f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PrimaryKeyPositionManager.java
@@ -17,23 +17,12 @@
package org.apache.shardingsphere.scaling.core.job.position;
-import lombok.AllArgsConstructor;
-
/**
* Primary key position manager.
*/
-@AllArgsConstructor
-public final class PrimaryKeyPositionManager implements PositionManager<PrimaryKeyPosition> {
-
- private PrimaryKeyPosition position;
-
- @Override
- public PrimaryKeyPosition getCurrentPosition() {
- return position;
- }
+public final class PrimaryKeyPositionManager extends BasePositionManager<PrimaryKeyPosition> implements PositionManager<PrimaryKeyPosition> {
- @Override
- public void updateCurrentPosition(final PrimaryKeyPosition newPosition) {
- position = newPosition;
+ public PrimaryKeyPositionManager(final PrimaryKeyPosition position) {
+ super(position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
index 60b515b..d5aa51b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManager.java
@@ -26,6 +26,7 @@ import com.google.gson.JsonParser;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManagerFactory;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -53,7 +54,7 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
private final Map<String, PositionManager<PrimaryKeyPosition>> inventoryPositionManagerMap = Maps.newConcurrentMap();
- private final Map<String, PositionManager> incrementalPositionManagerMap = Maps.newConcurrentMap();
+ private final Map<String, PositionManager<Position>> incrementalPositionManagerMap = Maps.newConcurrentMap();
private boolean resumable;
@@ -101,11 +102,11 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
JsonObject unfinished = new JsonObject();
Set<String> finished = Sets.newHashSet();
for (Entry<String, PositionManager<PrimaryKeyPosition>> entry : inventoryPositionManagerMap.entrySet()) {
- if (entry.getValue().getCurrentPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
+ if (entry.getValue().getPosition() instanceof PrimaryKeyPosition.FinishedPosition) {
finished.add(entry.getKey());
continue;
}
- unfinished.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+ unfinished.add(entry.getKey(), entry.getValue().getPosition().toJson());
}
result.add(UNFINISHED, unfinished);
result.add(FINISHED, GSON.toJsonTree(finished));
@@ -114,8 +115,8 @@ public abstract class AbstractResumeBreakPointManager implements ResumeBreakPoin
protected String getIncrementalPositionData() {
JsonObject result = new JsonObject();
- for (Entry<String, PositionManager> entry : incrementalPositionManagerMap.entrySet()) {
- result.add(entry.getKey(), entry.getValue().getCurrentPosition().toJson());
+ for (Entry<String, PositionManager<Position>> entry : incrementalPositionManagerMap.entrySet()) {
+ result.add(entry.getKey(), entry.getValue().getPosition().toJson());
}
return result.toString();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
index e468022..c33e90c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/resume/ResumeBreakPointManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -46,7 +47,7 @@ public interface ResumeBreakPointManager {
*
* @return incremental position map
*/
- Map<String, PositionManager> getIncrementalPositionManagerMap();
+ Map<String, PositionManager<Position>> getIncrementalPositionManagerMap();
/**
* Persist inventory position.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 1822f92..5262f8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -109,7 +109,7 @@ public final class ShardingScalingJobPreparer {
@SuppressWarnings("rawtypes")
private PositionManager instancePositionManager(final String databaseType, final DataSource dataSource) {
PositionManager positionManager = PositionManagerFactory.newInstance(databaseType, dataSource);
- positionManager.getCurrentPosition();
+ positionManager.getPosition();
return positionManager;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 094532d..3c89c3d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -63,7 +63,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
@Override
public void start() {
syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
- dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getCurrentPosition());
+ dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getPosition());
Collection<Importer> importers = instanceImporters();
instanceChannel(importers);
Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
@@ -94,7 +94,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private void instanceChannel(final Collection<Importer> importers) {
DistributionChannel channel = new DistributionChannel(importers.size(), records -> {
Record lastHandledRecord = records.get(records.size() - 1);
- getPositionManager().updateCurrentPosition(lastHandledRecord.getPosition());
+ getPositionManager().setPosition(lastHandledRecord.getPosition());
delayMillisecond = System.currentTimeMillis() - lastHandledRecord.getCommitTime();
});
dumper.setChannel(channel);
@@ -122,7 +122,7 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
@Override
public SyncProgress getProgress() {
- return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getCurrentPosition());
+ return new IncrementalDataSyncTaskProgress(getTaskId(), delayMillisecond, getPositionManager().getPosition());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 5745545..021ba04 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -128,7 +128,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
count++;
} else if (record instanceof FinishedRecord) {
if (record.getPosition() instanceof PrimaryKeyPosition) {
- getPositionManager().updateCurrentPosition(record.getPosition());
+ getPositionManager().setPosition(record.getPosition());
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index 2abcf50..b975366 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -25,8 +25,7 @@ import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.apache.shardingsphere.scaling.core.job.position.resume.ResumeBreakPointManager;
@@ -69,7 +68,7 @@ public final class SyncPositionResumerTest {
@Test
public void assertResumePosition() {
resumeBreakPointManager.getInventoryPositionManagerMap().put("ds0", new PrimaryKeyPositionManager(new PrimaryKeyPosition(0, 100)));
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", mockPositionManager());
+ resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0.t_order", new BasePositionManager<>());
syncPositionResumer.resumePosition(shardingScalingJob, new DataSourceManager(), resumeBreakPointManager);
assertThat(shardingScalingJob.getIncrementalDataTasks().size(), is(1));
assertTrue(shardingScalingJob.getInventoryDataTasks().isEmpty());
@@ -83,20 +82,6 @@ public final class SyncPositionResumerTest {
verify(resumeBreakPointManager).persistInventoryPosition();
}
- private PositionManager mockPositionManager() {
- return new PositionManager() {
-
- @Override
- public Position getCurrentPosition() {
- return null;
- }
-
- @Override
- public void updateCurrentPosition(final Position newPosition) {
- }
- };
- }
-
private SyncConfiguration mockSyncConfiguration() {
RdbmsConfiguration dumperConfig = mockDumperConfig();
RdbmsConfiguration importerConfig = new RdbmsConfiguration();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
index 574b7e6..a5435f4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManager.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.scaling.mysql;
import com.google.gson.Gson;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
@@ -30,47 +31,53 @@ import java.sql.SQLException;
/**
* MySQL position manager, based on binlog mechanism.
*/
-public final class MySQLPositionManager implements PositionManager<BinlogPosition> {
+public final class MySQLPositionManager extends BasePositionManager<BinlogPosition> implements PositionManager<BinlogPosition> {
private static final Gson GSON = new Gson();
private DataSource dataSource;
- private BinlogPosition currentPosition;
-
public MySQLPositionManager(final DataSource dataSource) {
this.dataSource = dataSource;
}
public MySQLPositionManager(final String position) {
- currentPosition = GSON.fromJson(position, BinlogPosition.class);
+ setPosition(GSON.fromJson(position, BinlogPosition.class));
}
@Override
- public BinlogPosition getCurrentPosition() {
- if (null == currentPosition) {
- getCurrentPositionFromSource();
+ public BinlogPosition getPosition() {
+ BinlogPosition position = super.getPosition();
+ if (null != position) {
+ return position;
}
- return currentPosition;
+ initPosition();
+ return super.getPosition();
}
- private void getCurrentPositionFromSource() {
+ private void initPosition() {
try (Connection connection = dataSource.getConnection()) {
- PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
- ResultSet resultSet = preparedStatement.executeQuery();
- resultSet.next();
- currentPosition = new BinlogPosition(resultSet.getString(1), resultSet.getLong(2));
- preparedStatement = connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
- resultSet = preparedStatement.executeQuery();
- resultSet.next();
- currentPosition.setServerId(resultSet.getLong(2));
+ BinlogPosition binlogPosition = getBinlogPosition(connection);
+ binlogPosition.setServerId(getServerId(connection));
+ setPosition(binlogPosition);
} catch (final SQLException ex) {
throw new RuntimeException("markPosition error", ex);
}
}
- @Override
- public void updateCurrentPosition(final BinlogPosition newPosition) {
- currentPosition = newPosition;
+ private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
+ try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ return new BinlogPosition(resultSet.getString(1), resultSet.getLong(2));
+ }
+ }
+
+ private long getServerId(final Connection connection) throws SQLException {
+ try (PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ return resultSet.getLong(2);
+ }
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
index 6c94772..e59dbd1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/binlog/BinlogPosition.java
@@ -35,9 +35,7 @@ import lombok.Setter;
@RequiredArgsConstructor
@Setter
@Getter
-public class BinlogPosition implements Position<BinlogPosition> {
-
- private static final long serialVersionUID = -4917415481787093677L;
+public class BinlogPosition implements Position {
private static final Gson GSON = new GsonBuilder().excludeFieldsWithoutExposeAnnotation().create();
@@ -50,12 +48,12 @@ public class BinlogPosition implements Position<BinlogPosition> {
private long serverId;
@Override
- public final int compareTo(final BinlogPosition binlogPosition) {
- if (null == binlogPosition) {
+ public final int compareTo(final Position position) {
+ if (null == position) {
return 1;
}
long o1 = toLong();
- long o2 = binlogPosition.toLong();
+ long o2 = ((BinlogPosition) position).toLong();
return Long.compare(o1, o2);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
index 443abfd..7315553 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/core/job/position/resume/AbstractResumeBreakPointManagerTest.java
@@ -17,11 +17,12 @@
package org.apache.shardingsphere.scaling.core.job.position.resume;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPositionManager;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.mysql.MySQLPositionManager;
import org.apache.shardingsphere.scaling.mysql.MySQLScalingEntry;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.apache.shardingsphere.scaling.utils.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
@@ -62,8 +63,8 @@ public final class AbstractResumeBreakPointManagerTest {
@Test
public void assertGetIncrementalPositionData() {
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new MySQLPositionManager("{\"filename\":\"mysql-bin.000001\",\"position\":4}"));
- resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new MySQLPositionManager("{\"filename\":\"mysql-bin.000002\",\"position\":4}"));
+ resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds0", new BasePositionManager<>(new BinlogPosition("mysql-bin.000001", 4L)));
+ resumeBreakPointManager.getIncrementalPositionManagerMap().put("ds1", new BasePositionManager<>(new BinlogPosition("mysql-bin.000002", 4L)));
assertThat(resumeBreakPointManager.getIncrementalPositionData(), is(incrementalPosition));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
index 49ab8b0..9ce18cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLPositionManagerTest.java
@@ -62,7 +62,7 @@ public final class MySQLPositionManagerTest {
@Test
public void assertGetCurrentPosition() {
MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
- BinlogPosition actual = mysqlPositionManager.getCurrentPosition();
+ BinlogPosition actual = mysqlPositionManager.getPosition();
assertThat(actual.getServerId(), is(SERVER_ID));
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
@@ -72,8 +72,8 @@ public final class MySQLPositionManagerTest {
public void assertUpdateCurrentPosition() {
MySQLPositionManager mysqlPositionManager = new MySQLPositionManager(dataSource);
BinlogPosition expected = new BinlogPosition(LOG_FILE_NAME, LOG_POSITION, SERVER_ID);
- mysqlPositionManager.updateCurrentPosition(expected);
- assertThat(mysqlPositionManager.getCurrentPosition(), is(expected));
+ mysqlPositionManager.setPosition(expected);
+ assertThat(mysqlPositionManager.getPosition(), is(expected));
}
private PreparedStatement mockPositionStatement() throws SQLException {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
index 08f828c..3d9d525 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManager.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
+import org.apache.shardingsphere.scaling.core.job.position.BasePositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.postgresql.replication.LogSequenceNumber;
@@ -31,7 +32,7 @@ import java.sql.SQLException;
/**
* PostgreSQL position manager.
*/
-public final class PostgreSQLPositionManager implements PositionManager<WalPosition> {
+public final class PostgreSQLPositionManager extends BasePositionManager<WalPosition> implements PositionManager<WalPosition> {
public static final String SLOT_NAME = "sharding_scaling";
@@ -41,37 +42,36 @@ public final class PostgreSQLPositionManager implements PositionManager<WalPosit
private DataSource dataSource;
- private WalPosition currentPosition;
-
public PostgreSQLPositionManager(final DataSource dataSource) {
this.dataSource = dataSource;
}
public PostgreSQLPositionManager(final String position) {
- currentPosition = new WalPosition(LogSequenceNumber.valueOf(position));
+ setPosition(new WalPosition(LogSequenceNumber.valueOf(position)));
}
@Override
- public WalPosition getCurrentPosition() {
- if (null == currentPosition) {
- getCurrentPositionFromSource();
+ public WalPosition getPosition() {
+ WalPosition position = super.getPosition();
+ if (null != position) {
+ return position;
}
- return currentPosition;
+ initPosition();
+ return super.getPosition();
}
- private void getCurrentPositionFromSource() {
+ private void initPosition() {
try (Connection connection = dataSource.getConnection()) {
// Need to create slot first, hold oldest wal event.
createIfNotExists(connection);
- currentPosition = getCurrentLsn(connection);
+ setPosition(getWalPosition(connection));
} catch (final SQLException ex) {
throw new RuntimeException("markPosition error", ex);
}
}
private void createIfNotExists(final Connection connection) throws SQLException {
- try {
- PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN));
+ try (PreparedStatement ps = connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", SLOT_NAME, DECODE_PLUGIN))) {
ps.execute();
} catch (final PSQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -80,23 +80,21 @@ public final class PostgreSQLPositionManager implements PositionManager<WalPosit
}
}
- private WalPosition getCurrentLsn(final Connection connection) throws SQLException {
- String sql;
- if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
- sql = "SELECT PG_CURRENT_XLOG_LOCATION()";
- } else if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
- sql = "SELECT PG_CURRENT_WAL_LSN()";
- } else {
- throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
+ private WalPosition getWalPosition(final Connection connection) throws SQLException {
+ try (PreparedStatement ps = connection.prepareStatement(getSql(connection));
+ ResultSet rs = ps.executeQuery()) {
+ rs.next();
+ return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
}
- PreparedStatement ps = connection.prepareStatement(sql);
- ResultSet rs = ps.executeQuery();
- rs.next();
- return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
}
- @Override
- public void updateCurrentPosition(final WalPosition newPosition) {
- currentPosition = newPosition;
+ private String getSql(final Connection connection) throws SQLException {
+ if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= connection.getMetaData().getDatabaseMinorVersion()) {
+ return "SELECT PG_CURRENT_XLOG_LOCATION()";
+ }
+ if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
+ return "SELECT PG_CURRENT_WAL_LSN()";
+ }
+ throw new RuntimeException("Not support PostgreSQL version:" + connection.getMetaData().getDatabaseProductVersion());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 70f6243..3186a8a 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -18,15 +18,14 @@
package org.apache.shardingsphere.scaling.postgresql;
import lombok.Setter;
-
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
index 8c8e98d..9b56018 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPosition.java
@@ -29,21 +29,19 @@ import org.postgresql.replication.LogSequenceNumber;
*/
@RequiredArgsConstructor
@Getter
-public final class WalPosition implements Position<WalPosition> {
-
- private static final long serialVersionUID = -3498484556749679001L;
+public final class WalPosition implements Position {
private static final Gson GSON = new Gson();
private final LogSequenceNumber logSequenceNumber;
@Override
- public int compareTo(final WalPosition walPosition) {
- if (null == walPosition) {
+ public int compareTo(final Position position) {
+ if (null == position) {
return 1;
}
long o1 = logSequenceNumber.asLong();
- long o2 = walPosition.logSequenceNumber.asLong();
+ long o2 = ((WalPosition) position).logSequenceNumber.asLong();
return Long.compare(o1, o2);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
index a924740..f74c487 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLPositionManagerTest.java
@@ -70,7 +70,7 @@ public final class PostgreSQLPositionManagerTest {
PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
- WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
+ WalPosition actual = postgreSQLPositionManager.getPosition();
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}
@@ -78,7 +78,7 @@ public final class PostgreSQLPositionManagerTest {
public void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
- WalPosition actual = postgreSQLPositionManager.getCurrentPosition();
+ WalPosition actual = postgreSQLPositionManager.getPosition();
assertThat(actual.getLogSequenceNumber(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}
@@ -87,15 +87,15 @@ public final class PostgreSQLPositionManagerTest {
PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
- postgreSQLPositionManager.getCurrentPosition();
+ postgreSQLPositionManager.getPosition();
}
@Test
public void assertUpdateCurrentPosition() {
PostgreSQLPositionManager postgreSQLPositionManager = new PostgreSQLPositionManager(dataSource);
WalPosition expected = new WalPosition(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN));
- postgreSQLPositionManager.updateCurrentPosition(expected);
- assertThat(postgreSQLPositionManager.getCurrentPosition(), is(expected));
+ postgreSQLPositionManager.setPosition(expected);
+ assertThat(postgreSQLPositionManager.getPosition(), is(expected));
}
private PreparedStatement mockPostgreSQL96Lsn() throws SQLException {