You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/08/17 11:06:32 UTC
[shardingsphere] branch master updated: Refactor RdbmsConfiguration
(#6883)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bd75ccf Refactor RdbmsConfiguration (#6883)
bd75ccf is described below
commit bd75ccfbee9c7981850cf2d8fbb98afd7c788268
Author: avalon5666 <64...@users.noreply.github.com>
AuthorDate: Mon Aug 17 19:06:17 2020 +0800
Refactor RdbmsConfiguration (#6883)
* Refactor RdbmsConfiguraton(#6687)
* Refactor SyncConfiguration
* Refactor InventoryDataTaskSplitter & SyncTaskFactory
* Refactor DumperConfiguration
* For checkstyle
---
...Configuration.java => DumperConfiguration.java} | 30 +----------
...nfiguration.java => ImporterConfiguration.java} | 20 +++----
...tion.java => InventoryDumperConfiguration.java} | 25 +++++----
.../scaling/core/config/SyncConfiguration.java | 8 +--
.../executor/dumper/AbstractJDBCDumper.java | 24 ++++-----
.../execute/executor/dumper/DumperFactory.java | 29 +++++-----
.../executor/importer/AbstractJDBCImporter.java | 16 +++---
.../execute/executor/importer/ImporterFactory.java | 14 ++---
.../job/preparer/ShardingScalingJobPreparer.java | 2 +-
.../job/preparer/resumer/SyncPositionResumer.java | 19 +++----
.../splitter/InventoryDataTaskSplitter.java | 62 +++++++++++-----------
.../core/job/task/DefaultSyncTaskFactory.java | 12 +++--
.../scaling/core/job/task/SyncTaskFactory.java | 15 ++++--
.../incremental/IncrementalDataScalingTask.java | 28 ++++++----
.../task/inventory/InventoryDataScalingTask.java | 40 +++++++-------
.../scaling/core/utils/RdbmsConfigurationUtil.java | 8 +--
.../scaling/core/utils/SyncConfigurationUtil.java | 20 +++----
.../core/config/RdbmsConfigurationTest.java | 48 -----------------
.../importer/AbstractJDBCImporterTest.java | 9 ++--
.../scaling/core/fixture/FixtureH2JDBCDumper.java | 8 +--
.../scaling/core/fixture/FixtureNopImporter.java | 4 +-
.../preparer/resumer/SyncPositionResumerTest.java | 18 ++++---
.../splitter/InventoryDataTaskSplitterTest.java | 27 +++++-----
.../inventory/InventoryDataScalingTaskTest.java | 34 +++++++-----
.../core/util/SyncConfigurationUtilTest.java | 2 +-
.../scaling/mysql/MySQLBinlogDumper.java | 18 +++----
.../scaling/mysql/MySQLImporter.java | 6 +--
.../scaling/mysql/MySQLJdbcDumper.java | 8 +--
.../scaling/postgresql/PostgreSQLImporter.java | 6 +--
.../scaling/postgresql/PostgreSQLJdbcDumper.java | 8 +--
.../scaling/postgresql/PostgreSQLWalDumper.java | 14 ++---
.../scaling/postgresql/wal/WalEventConverter.java | 16 +++---
32 files changed, 271 insertions(+), 327 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
similarity index 63%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
index bb9549e..4602f39 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/DumperConfiguration.java
@@ -17,51 +17,25 @@
package org.apache.shardingsphere.scaling.core.config;
-import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import java.util.Map;
-import java.util.Set;
/**
- * Relational database management system configuration.
+ * Dumper configuration.
*/
@Setter
@Getter
-@EqualsAndHashCode
-public final class RdbmsConfiguration implements Cloneable {
+public class DumperConfiguration {
private String dataSourceName;
private DataSourceConfiguration dataSourceConfiguration;
- private String tableName;
-
- private Map<String, Set<String>> shardingColumnsMap;
-
- private String primaryKey;
-
@SuppressWarnings("rawtypes")
private PositionManager positionManager;
- private Integer spiltNum;
-
private Map<String, String> tableNameMap;
-
- private int retryTimes;
-
- /**
- * Clone to new rdbms configuration.
- *
- * @param origin origin rdbms configuration
- * @return new rdbms configuration
- */
- @SneakyThrows
- public static RdbmsConfiguration clone(final RdbmsConfiguration origin) {
- return (RdbmsConfiguration) origin.clone();
- }
-
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ImporterConfiguration.java
old mode 100755
new mode 100644
similarity index 70%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ImporterConfiguration.java
index 45a6908..4eda2b4
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ImporterConfiguration.java
@@ -18,25 +18,21 @@
package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import java.util.Map;
+import java.util.Set;
/**
- * Sync configuration.
+ * Importer configuration.
*/
+@Setter
@Getter
-@RequiredArgsConstructor
-public final class SyncConfiguration {
+public final class ImporterConfiguration {
- /**
- * The concurrency of writers.
- */
- private final int concurrency;
+ private DataSourceConfiguration dataSourceConfiguration;
- private final Map<String, String> tableNameMap;
+ private Map<String, Set<String>> shardingColumnsMap;
- private final RdbmsConfiguration dumperConfiguration;
-
- private final RdbmsConfiguration importerConfiguration;
+ private int retryTimes;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
old mode 100755
new mode 100644
similarity index 62%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
index 45a6908..b3e5082
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
@@ -18,25 +18,24 @@
package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Map;
+import lombok.Setter;
/**
- * Sync configuration.
+ * Inventory dumper configuration.
*/
@Getter
-@RequiredArgsConstructor
-public final class SyncConfiguration {
+@Setter
+public final class InventoryDumperConfiguration extends DumperConfiguration {
- /**
- * The concurrency of writers.
- */
- private final int concurrency;
+ private String tableName;
- private final Map<String, String> tableNameMap;
+ private String primaryKey;
- private final RdbmsConfiguration dumperConfiguration;
+ private Integer spiltNum;
- private final RdbmsConfiguration importerConfiguration;
+ public InventoryDumperConfiguration(final DumperConfiguration dumperConfiguration) {
+ setDataSourceName(dumperConfiguration.getDataSourceName());
+ setDataSourceConfiguration(dumperConfiguration.getDataSourceConfiguration());
+ setTableNameMap(dumperConfiguration.getTableNameMap());
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
index 45a6908..a2f9936 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
@@ -20,8 +20,6 @@ package org.apache.shardingsphere.scaling.core.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import java.util.Map;
-
/**
* Sync configuration.
*/
@@ -34,9 +32,7 @@ public final class SyncConfiguration {
*/
private final int concurrency;
- private final Map<String, String> tableNameMap;
-
- private final RdbmsConfiguration dumperConfiguration;
+ private final DumperConfiguration dumperConfiguration;
- private final RdbmsConfiguration importerConfiguration;
+ private final ImporterConfiguration importerConfiguration;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index b157ba6..a47bec2 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -21,8 +21,8 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
@@ -54,7 +54,7 @@ import java.sql.SQLException;
public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor<InventoryPosition> implements JDBCDumper {
@Getter(AccessLevel.PROTECTED)
- private final RdbmsConfiguration rdbmsConfiguration;
+ private final InventoryDumperConfiguration inventoryDumperConfiguration;
private final DataSourceManager dataSourceManager;
@@ -63,18 +63,18 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
@Setter
private Channel channel;
- protected AbstractJDBCDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
+ protected AbstractJDBCDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
+ if (!JDBCDataSourceConfiguration.class.equals(inventoryDumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("AbstractJDBCDumper only support JDBCDataSourceConfiguration");
}
- this.rdbmsConfiguration = rdbmsConfiguration;
+ this.inventoryDumperConfiguration = inventoryDumperConfiguration;
this.dataSourceManager = dataSourceManager;
tableMetaData = createTableMetaData();
}
private TableMetaData createTableMetaData() {
- MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()));
- return metaDataManager.getTableMetaData(rdbmsConfiguration.getTableName());
+ MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration()));
+ return metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName());
}
@Override
@@ -84,15 +84,15 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
}
private void dump() {
- try (Connection conn = dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()).getConnection()) {
- String sql = String.format("SELECT * FROM %s %s", rdbmsConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration));
+ try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration()).getConnection()) {
+ String sql = String.format("SELECT * FROM %s %s", inventoryDumperConfiguration.getTableName(), RdbmsConfigurationUtil.getWhereCondition(inventoryDumperConfiguration));
PreparedStatement ps = createPreparedStatement(conn, sql);
ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (isRunning() && rs.next()) {
DataRecord record = new DataRecord(newInventoryPosition(rs), metaData.getColumnCount());
record.setType(ScalingConstant.INSERT);
- record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
+ record.setTableName(inventoryDumperConfiguration.getTableNameMap().get(inventoryDumperConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i)));
}
@@ -109,10 +109,10 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
}
private InventoryPosition newInventoryPosition(final ResultSet rs) throws SQLException {
- if (null == rdbmsConfiguration.getPrimaryKey()) {
+ if (null == inventoryDumperConfiguration.getPrimaryKey()) {
return new PlaceholderPosition();
}
- return new PrimaryKeyPosition(rs.getLong(rdbmsConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) rdbmsConfiguration.getPositionManager().getPosition()).getEndValue());
+ return new PrimaryKeyPosition(rs.getLong(inventoryDumperConfiguration.getPrimaryKey()), ((PrimaryKeyPosition) inventoryDumperConfiguration.getPositionManager().getPosition()).getEndValue());
}
protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
index 20ae77b..6a355fd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/DumperFactory.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.scaling.core.execute.executor.dumper;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
/**
* Dumper factory.
@@ -35,52 +36,52 @@ public final class DumperFactory {
/**
* New instance of JDBC dumper.
*
- * @param rdbmsConfiguration rdbms configuration
+ * @param inventoryDumperConfiguration inventory dumper configuration
* @param dataSourceManager data source factory
* @return JDBC dumper
*/
@SneakyThrows
- public static JDBCDumper newInstanceJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- return newInstanceJdbcDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, dataSourceManager);
+ public static JDBCDumper newInstanceJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
+ return newInstanceJdbcDumper(inventoryDumperConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), inventoryDumperConfiguration, dataSourceManager);
}
/**
* New instance of JDBC dumper.
*
* @param databaseType database type
- * @param rdbmsConfiguration rdbms configuration
+ * @param inventoryDumperConfiguration inventory dumper configuration
* @param dataSourceManager data source factory
* @return JDBC dumper
*/
@SneakyThrows
- public static JDBCDumper newInstanceJdbcDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
+ public static JDBCDumper newInstanceJdbcDumper(final String databaseType, final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getJdbcDumperClass().getConstructor(RdbmsConfiguration.class, DataSourceManager.class).newInstance(rdbmsConfiguration, dataSourceManager);
+ return scalingEntry.getJdbcDumperClass().getConstructor(InventoryDumperConfiguration.class, DataSourceManager.class).newInstance(inventoryDumperConfiguration, dataSourceManager);
}
/**
* New instance of log dumper.
*
- * @param rdbmsConfiguration rdbms configuration
+ * @param dumperConfiguration rdbms configuration
* @param position position
* @return log dumper
*/
@SneakyThrows
- public static LogDumper newInstanceLogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
- return newInstanceLogDumper(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, position);
+ public static LogDumper newInstanceLogDumper(final DumperConfiguration dumperConfiguration, final Position position) {
+ return newInstanceLogDumper(dumperConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), dumperConfiguration, position);
}
/**
* New instance of log dumper.
*
* @param databaseType database type
- * @param rdbmsConfiguration rdbms configuration
+ * @param dumperConfiguration rdbms configuration
* @param position position
* @return log dumper
*/
@SneakyThrows
- public static LogDumper newInstanceLogDumper(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final Position position) {
+ public static LogDumper newInstanceLogDumper(final String databaseType, final DumperConfiguration dumperConfiguration, final Position position) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getLogDumperClass().getConstructor(RdbmsConfiguration.class, Position.class).newInstance(rdbmsConfiguration, position);
+ return scalingEntry.getLogDumperClass().getConstructor(DumperConfiguration.class, Position.class).newInstance(dumperConfiguration, position);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index 9221922..a92a10e 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
@@ -47,7 +47,7 @@ import java.util.List;
@Slf4j
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
- private final RdbmsConfiguration rdbmsConfiguration;
+ private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager;
@@ -56,8 +56,8 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
@Setter
private Channel channel;
- protected AbstractJDBCImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- this.rdbmsConfiguration = rdbmsConfiguration;
+ protected AbstractJDBCImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
+ this.importerConfiguration = importerConfiguration;
this.dataSourceManager = dataSourceManager;
sqlBuilder = createSqlBuilder();
}
@@ -80,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
while (isRunning()) {
List<Record> records = channel.fetchRecords(100, 3);
if (null != records && !records.isEmpty()) {
- flush(dataSourceManager.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()), records);
+ flush(dataSourceManager.getDataSource(importerConfiguration.getDataSourceConfiguration()), records);
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
channel.ack();
break;
@@ -98,7 +98,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) {
- int retryTimes = rdbmsConfiguration.getRetryTimes();
+ int retryTimes = importerConfiguration.getRetryTimes();
List<Record> unflushed = buffer;
do {
unflushed = doFlush(dataSource, unflushed);
@@ -152,7 +152,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
- List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfiguration.getShardingColumnsMap().get(record.getTableName()));
List<Column> values = new ArrayList<>();
values.addAll(RecordUtil.extractUpdatedColumns(record));
values.addAll(conditionColumns);
@@ -165,7 +165,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
- List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, rdbmsConfiguration.getShardingColumnsMap().get(record.getTableName()));
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfiguration.getShardingColumnsMap().get(record.getTableName()));
String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < conditionColumns.size(); i++) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
index f124b08..c43de59 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/ImporterFactory.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
@@ -34,25 +34,25 @@ public final class ImporterFactory {
/**
* New instance of importer.
*
- * @param rdbmsConfiguration rdbms configuration
+ * @param importerConfiguration rdbms configuration
* @param dataSourceManager data source factory
* @return importer
*/
- public static Importer newInstance(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- return newInstance(rdbmsConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), rdbmsConfiguration, dataSourceManager);
+ public static Importer newInstance(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
+ return newInstance(importerConfiguration.getDataSourceConfiguration().getDatabaseType().getName(), importerConfiguration, dataSourceManager);
}
/**
* New instance of importer.
*
* @param databaseType database type
- * @param rdbmsConfiguration rdbms configuration
+ * @param importerConfiguration rdbms configuration
* @param dataSourceManager data source factory
* @return importer
*/
@SneakyThrows
- public static Importer newInstance(final String databaseType, final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
+ public static Importer newInstance(final String databaseType, final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
ScalingEntry scalingEntry = ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
- return scalingEntry.getImporterClass().getConstructor(RdbmsConfiguration.class, DataSourceManager.class).newInstance(rdbmsConfiguration, dataSourceManager);
+ return scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, DataSourceManager.class).newInstance(importerConfiguration, dataSourceManager);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index e2b4676..2aa3d37 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -104,7 +104,7 @@ public final class ShardingScalingJobPreparer {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
DataSourceConfiguration dataSourceConfiguration = each.getDumperConfiguration().getDataSourceConfiguration();
each.getDumperConfiguration().setPositionManager(instancePositionManager(databaseType, dataSourceManager.getDataSource(dataSourceConfiguration)));
- shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+ shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(), each.getDumperConfiguration(), each.getImporterConfiguration()));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index 9ed0da7..be8642c 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -17,7 +17,8 @@
package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
@@ -72,37 +73,37 @@ public final class SyncPositionResumer {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
MetaDataManager metaDataManager = new MetaDataManager(dataSourceManager.getDataSource(each.getDumperConfiguration().getDataSourceConfiguration()));
for (Entry<String, PositionManager<InventoryPosition>> entry : getInventoryPositionMap(each.getDumperConfiguration(), resumeBreakPointManager).entrySet()) {
- result.add(syncTaskFactory.createInventoryDataSyncTask(newSyncConfiguration(each, metaDataManager, entry)));
+ result.add(syncTaskFactory.createInventoryDataSyncTask(newInventoryDumperConfiguration(each.getDumperConfiguration(), metaDataManager, entry), each.getImporterConfiguration()));
}
}
return result;
}
- private SyncConfiguration newSyncConfiguration(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final Entry<String, PositionManager<InventoryPosition>> entry) {
+ private InventoryDumperConfiguration newInventoryDumperConfiguration(final DumperConfiguration dumperConfiguration, final MetaDataManager metaDataManager,
+ final Entry<String, PositionManager<InventoryPosition>> entry) {
String[] splitTable = entry.getKey().split("#");
- RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+ InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(dumperConfiguration);
splitDumperConfig.setTableName(splitTable[0].split("\\.")[1]);
splitDumperConfig.setPositionManager(entry.getValue());
if (2 == splitTable.length) {
splitDumperConfig.setSpiltNum(Integer.parseInt(splitTable[1]));
}
splitDumperConfig.setPrimaryKey(metaDataManager.getTableMetaData(splitDumperConfig.getTableName()).getPrimaryKeyColumns().get(0));
- return new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
- splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration()));
+ return splitDumperConfig;
}
private Map<String, PositionManager<InventoryPosition>> getInventoryPositionMap(
- final RdbmsConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
+ final DumperConfiguration dumperConfiguration, final ResumeBreakPointManager resumeBreakPointManager) {
Pattern pattern = Pattern.compile(String.format("%s\\.\\w+(#\\d+)?", dumperConfiguration.getDataSourceName()));
return resumeBreakPointManager.getInventoryPositionManagerMap().entrySet().stream()
.filter(entry -> pattern.matcher(entry.getKey()).find())
.collect(Collectors.toMap(Entry::getKey, Map.Entry::getValue));
}
-
+
private void resumeIncrementalPosition(final ShardingScalingJob shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigurations()) {
each.getDumperConfiguration().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfiguration().getDataSourceName()));
- shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each));
+ shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(), each.getDumperConfiguration(), each.getImporterConfiguration()));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index bb39253..afb75fb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -18,7 +18,8 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
@@ -59,19 +60,19 @@ public final class InventoryDataTaskSplitter {
*/
public Collection<ScalingTask<InventoryPosition>> splitInventoryData(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
Collection<ScalingTask<InventoryPosition>> result = new LinkedList<>();
- for (SyncConfiguration each : splitConfiguration(syncConfiguration, dataSourceManager)) {
- result.add(syncTaskFactory.createInventoryDataSyncTask(each));
+ for (InventoryDumperConfiguration each : splitDumperConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getDumperConfiguration(), dataSourceManager)) {
+ result.add(syncTaskFactory.createInventoryDataSyncTask(each, syncConfiguration.getImporterConfiguration()));
}
return result;
}
- private Collection<SyncConfiguration> splitConfiguration(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
- Collection<SyncConfiguration> result = new LinkedList<>();
- DataSource dataSource = dataSourceManager.getDataSource(syncConfiguration.getDumperConfiguration().getDataSourceConfiguration());
+ private Collection<InventoryDumperConfiguration> splitDumperConfiguration(final int concurrency, final DumperConfiguration dumperConfiguration, final DataSourceManager dataSourceManager) {
+ Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+ DataSource dataSource = dataSourceManager.getDataSource(dumperConfiguration.getDataSourceConfiguration());
MetaDataManager metaDataManager = new MetaDataManager(dataSource);
- for (SyncConfiguration each : splitByTable(syncConfiguration)) {
- if (isSpiltByPrimaryKeyRange(each.getDumperConfiguration(), metaDataManager)) {
- result.addAll(splitByPrimaryKeyRange(each, metaDataManager, dataSource));
+ for (InventoryDumperConfiguration each : splitByTable(dumperConfiguration)) {
+ if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
+ result.addAll(splitByPrimaryKeyRange(concurrency, each, metaDataManager, dataSource));
} else {
result.add(each);
}
@@ -79,36 +80,35 @@ public final class InventoryDataTaskSplitter {
return result;
}
- private Collection<SyncConfiguration> splitByTable(final SyncConfiguration syncConfiguration) {
- Collection<SyncConfiguration> result = new LinkedList<>();
- for (String each : syncConfiguration.getTableNameMap().keySet()) {
- RdbmsConfiguration dumperConfig = RdbmsConfiguration.clone(syncConfiguration.getDumperConfiguration());
+ private Collection<InventoryDumperConfiguration> splitByTable(final DumperConfiguration dumperConfiguration) {
+ Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+ for (String each : dumperConfiguration.getTableNameMap().keySet()) {
+ InventoryDumperConfiguration dumperConfig = new InventoryDumperConfiguration(dumperConfiguration);
dumperConfig.setTableName(each);
dumperConfig.setPositionManager(new InventoryPositionManager<>(new PlaceholderPosition()));
- result.add(new SyncConfiguration(syncConfiguration.getConcurrency(), syncConfiguration.getTableNameMap(),
- dumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
+ result.add(dumperConfig);
}
return result;
}
- private boolean isSpiltByPrimaryKeyRange(final RdbmsConfiguration rdbmsConfiguration, final MetaDataManager metaDataManager) {
- TableMetaData tableMetaData = metaDataManager.getTableMetaData(rdbmsConfiguration.getTableName());
+ private boolean isSpiltByPrimaryKeyRange(final InventoryDumperConfiguration inventoryDumperConfiguration, final MetaDataManager metaDataManager) {
+ TableMetaData tableMetaData = metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName());
if (null == tableMetaData) {
- log.warn("Can't split range for table {}, reason: can not get table metadata ", rdbmsConfiguration.getTableName());
+ log.warn("Can't split range for table {}, reason: can not get table metadata ", inventoryDumperConfiguration.getTableName());
return false;
}
List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
if (null == primaryKeys || primaryKeys.isEmpty()) {
- log.warn("Can't split range for table {}, reason: no primary key", rdbmsConfiguration.getTableName());
+ log.warn("Can't split range for table {}, reason: no primary key", inventoryDumperConfiguration.getTableName());
return false;
}
if (primaryKeys.size() > 1) {
- log.warn("Can't split range for table {}, reason: primary key is union primary", rdbmsConfiguration.getTableName());
+ log.warn("Can't split range for table {}, reason: primary key is union primary", inventoryDumperConfiguration.getTableName());
return false;
}
int index = tableMetaData.findColumnIndex(primaryKeys.get(0));
if (isNotIntegerPrimary(tableMetaData.getColumnMetaData(index).getDataType())) {
- log.warn("Can't split range for table {}, reason: primary key is not integer number", rdbmsConfiguration.getTableName());
+ log.warn("Can't split range for table {}, reason: primary key is not integer number", inventoryDumperConfiguration.getTableName());
return false;
}
return true;
@@ -118,21 +118,20 @@ public final class InventoryDataTaskSplitter {
return Types.INTEGER != columnType && Types.BIGINT != columnType && Types.SMALLINT != columnType && Types.TINYINT != columnType;
}
- private Collection<SyncConfiguration> splitByPrimaryKeyRange(final SyncConfiguration syncConfiguration, final MetaDataManager metaDataManager, final DataSource dataSource) {
- int concurrency = syncConfiguration.getConcurrency();
- Collection<SyncConfiguration> result = new LinkedList<>();
- RdbmsConfiguration dumperConfiguration = syncConfiguration.getDumperConfiguration();
- String primaryKey = metaDataManager.getTableMetaData(dumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0);
- dumperConfiguration.setPrimaryKey(primaryKey);
+ private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(final int concurrency, final InventoryDumperConfiguration inventoryDumperConfiguration,
+ final MetaDataManager metaDataManager, final DataSource dataSource) {
+ Collection<InventoryDumperConfiguration> result = new LinkedList<>();
+ String primaryKey = metaDataManager.getTableMetaData(inventoryDumperConfiguration.getTableName()).getPrimaryKeyColumns().get(0);
+ inventoryDumperConfiguration.setPrimaryKey(primaryKey);
try (Connection connection = dataSource.getConnection()) {
- PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, dumperConfiguration.getTableName()));
+ PreparedStatement ps = connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT 1", primaryKey, primaryKey, inventoryDumperConfiguration.getTableName()));
ResultSet rs = ps.executeQuery();
rs.next();
long min = rs.getLong(1);
long max = rs.getLong(2);
long step = (max - min) / concurrency;
for (int i = 0; i < concurrency && min <= max; i++) {
- RdbmsConfiguration splitDumperConfig = RdbmsConfiguration.clone(dumperConfiguration);
+ InventoryDumperConfiguration splitDumperConfig = new InventoryDumperConfiguration(inventoryDumperConfiguration);
if (i < concurrency - 1) {
splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, min + step)));
min += step + 1;
@@ -140,11 +139,10 @@ public final class InventoryDataTaskSplitter {
splitDumperConfig.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(min, max)));
}
splitDumperConfig.setSpiltNum(i);
- result.add(new SyncConfiguration(concurrency, syncConfiguration.getTableNameMap(),
- splitDumperConfig, RdbmsConfiguration.clone(syncConfiguration.getImporterConfiguration())));
+ result.add(splitDumperConfig);
}
} catch (final SQLException ex) {
- throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", dumperConfiguration.getTableName(), primaryKey), ex);
+ throw new PrepareFailedException(String.format("Split task for table %s by primary key %s error", inventoryDumperConfiguration.getTableName(), primaryKey), ex);
}
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
index 6972d8e..96b5bd2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/DefaultSyncTaskFactory.java
@@ -17,7 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.task;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
@@ -36,12 +38,12 @@ public final class DefaultSyncTaskFactory implements SyncTaskFactory {
}
@Override
- public InventoryDataScalingTask createInventoryDataSyncTask(final SyncConfiguration syncConfiguration) {
- return new InventoryDataScalingTask(syncConfiguration);
+ public InventoryDataScalingTask createInventoryDataSyncTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration) {
+ return new InventoryDataScalingTask(inventoryDumperConfiguration, importerConfiguration);
}
@Override
- public IncrementalDataScalingTask createIncrementalDataSyncTask(final SyncConfiguration syncConfiguration) {
- return new IncrementalDataScalingTask(syncConfiguration);
+ public IncrementalDataScalingTask createIncrementalDataSyncTask(final int concurrency, final DumperConfiguration dumperConfiguration, final ImporterConfiguration importerConfiguration) {
+ return new IncrementalDataScalingTask(concurrency, dumperConfiguration, importerConfiguration);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
index e77a331..fca7928 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/SyncTaskFactory.java
@@ -17,7 +17,9 @@
package org.apache.shardingsphere.scaling.core.job.task;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalDataScalingTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryDataScalingTask;
@@ -41,16 +43,19 @@ public interface SyncTaskFactory {
/**
* Create inventory data sync task.
*
- * @param syncConfiguration sync configuration
+ * @param inventoryDumperConfiguration inventory dumper configuration
+ * @param importerConfiguration importer configuration
* @return inventory data sync task
*/
- InventoryDataScalingTask createInventoryDataSyncTask(SyncConfiguration syncConfiguration);
+ InventoryDataScalingTask createInventoryDataSyncTask(InventoryDumperConfiguration inventoryDumperConfiguration, ImporterConfiguration importerConfiguration);
/**
* Create incremental data sync task.
*
- * @param syncConfiguration sync configuration
+ * @param concurrency concurrency
+ * @param dumperConfiguration dumper configuration
+ * @param importerConfiguration importer configuration
* @return incremental data sync task
*/
- IncrementalDataScalingTask createIncrementalDataSyncTask(SyncConfiguration syncConfiguration);
+ IncrementalDataScalingTask createIncrementalDataSyncTask(int concurrency, DumperConfiguration dumperConfiguration, ImporterConfiguration importerConfiguration);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
index 8979cf6..fb0b24d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalDataScalingTask.java
@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.task.incremental;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
@@ -46,7 +47,11 @@ import java.util.concurrent.Future;
@Slf4j
public final class IncrementalDataScalingTask extends AbstractShardingScalingExecutor<IncrementalPosition> implements ScalingTask<IncrementalPosition> {
- private final SyncConfiguration syncConfiguration;
+ private final int concurrency;
+
+ private final DumperConfiguration dumperConfiguration;
+
+ private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager;
@@ -55,17 +60,18 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
private long delayMillisecond;
@SuppressWarnings("unchecked")
- public IncrementalDataScalingTask(final SyncConfiguration syncConfiguration) {
- this.syncConfiguration = syncConfiguration;
+ public IncrementalDataScalingTask(final int concurrency, final DumperConfiguration dumperConfiguration, final ImporterConfiguration importerConfiguration) {
+ this.concurrency = concurrency;
+ this.dumperConfiguration = dumperConfiguration;
+ this.importerConfiguration = importerConfiguration;
dataSourceManager = new DataSourceManager();
- setTaskId(syncConfiguration.getDumperConfiguration().getDataSourceName());
- setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
+ setTaskId(dumperConfiguration.getDataSourceName());
+ setPositionManager(dumperConfiguration.getPositionManager());
}
@Override
public void start() {
- syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
- dumper = DumperFactory.newInstanceLogDumper(syncConfiguration.getDumperConfiguration(), getPositionManager().getPosition());
+ dumper = DumperFactory.newInstanceLogDumper(dumperConfiguration, getPositionManager().getPosition());
Collection<Importer> importers = instanceImporters();
instanceChannel(importers);
Future<?> future = ScalingContext.getInstance().getTaskExecuteEngine().submitAll(importers, new ExecuteCallback() {
@@ -86,9 +92,9 @@ public final class IncrementalDataScalingTask extends AbstractShardingScalingExe
}
private List<Importer> instanceImporters() {
- List<Importer> result = new ArrayList<>(syncConfiguration.getConcurrency());
- for (int i = 0; i < syncConfiguration.getConcurrency(); i++) {
- result.add(ImporterFactory.newInstance(syncConfiguration.getImporterConfiguration(), dataSourceManager));
+ List<Importer> result = new ArrayList<>(concurrency);
+ for (int i = 0; i < concurrency; i++) {
+ result.add(ImporterFactory.newInstance(importerConfiguration, dataSourceManager));
}
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index c7ed32a..236dbf1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -18,10 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
-import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
@@ -37,6 +36,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -52,7 +52,9 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
public final class InventoryDataScalingTask extends AbstractShardingScalingExecutor<InventoryPosition> implements ScalingTask<InventoryPosition> {
- private final SyncConfiguration syncConfiguration;
+ private final InventoryDumperConfiguration inventoryDumperConfiguration;
+
+ private final ImporterConfiguration importerConfiguration;
private final DataSourceManager dataSourceManager;
@@ -62,28 +64,29 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
private Dumper dumper;
- public InventoryDataScalingTask(final SyncConfiguration syncConfiguration) {
- this(syncConfiguration, new DataSourceManager());
+ public InventoryDataScalingTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration) {
+ this(inventoryDumperConfiguration, importerConfiguration, new DataSourceManager());
}
@SuppressWarnings("unchecked")
- public InventoryDataScalingTask(final SyncConfiguration syncConfiguration, final DataSourceManager dataSourceManager) {
- this.syncConfiguration = syncConfiguration;
+ public InventoryDataScalingTask(final InventoryDumperConfiguration inventoryDumperConfiguration, final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
+ this.inventoryDumperConfiguration = inventoryDumperConfiguration;
+ this.importerConfiguration = importerConfiguration;
this.dataSourceManager = dataSourceManager;
- setTaskId(generateSyncTaskId(syncConfiguration.getDumperConfiguration()));
- setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
+ setTaskId(generateSyncTaskId(inventoryDumperConfiguration));
+ setPositionManager(inventoryDumperConfiguration.getPositionManager());
}
- private String generateSyncTaskId(final RdbmsConfiguration dumperConfiguration) {
- String result = String.format("%s.%s", dumperConfiguration.getDataSourceName(), dumperConfiguration.getTableName());
- return null == dumperConfiguration.getSpiltNum() ? result : result + "#" + dumperConfiguration.getSpiltNum();
+ private String generateSyncTaskId(final InventoryDumperConfiguration inventoryDumperConfiguration) {
+ String result = String.format("%s.%s", inventoryDumperConfiguration.getDataSourceName(), inventoryDumperConfiguration.getTableName());
+ return null == inventoryDumperConfiguration.getSpiltNum() ? result : result + "#" + inventoryDumperConfiguration.getSpiltNum();
}
@Override
public void start() {
getEstimatedRows();
instanceDumper();
- Importer importer = ImporterFactory.newInstance(syncConfiguration.getImporterConfiguration(), dataSourceManager);
+ Importer importer = ImporterFactory.newInstance(importerConfiguration, dataSourceManager);
instanceChannel(importer);
Future<?> future = ScalingContext.getInstance().getImporterExecuteEngine().submit(importer, new ExecuteCallback() {
@@ -103,11 +106,11 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
}
private void getEstimatedRows() {
- DataSource dataSource = dataSourceManager.getDataSource(syncConfiguration.getDumperConfiguration().getDataSourceConfiguration());
+ DataSource dataSource = dataSourceManager.getDataSource(inventoryDumperConfiguration.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection()) {
ResultSet resultSet = connection.prepareStatement(String.format("SELECT COUNT(*) FROM %s %s",
- syncConfiguration.getDumperConfiguration().getTableName(),
- RdbmsConfigurationUtil.getWhereCondition(syncConfiguration.getDumperConfiguration())))
+ inventoryDumperConfiguration.getTableName(),
+ RdbmsConfigurationUtil.getWhereCondition(inventoryDumperConfiguration)))
.executeQuery();
resultSet.next();
estimatedRows = resultSet.getInt(1);
@@ -117,8 +120,7 @@ public final class InventoryDataScalingTask extends AbstractShardingScalingExecu
}
private void instanceDumper() {
- syncConfiguration.getDumperConfiguration().setTableNameMap(syncConfiguration.getTableNameMap());
- dumper = DumperFactory.newInstanceJdbcDumper(syncConfiguration.getDumperConfiguration(), dataSourceManager);
+ dumper = DumperFactory.newInstanceJdbcDumper(inventoryDumperConfiguration, dataSourceManager);
}
private void instanceChannel(final Importer importer) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
index 5426e42..a0abb07 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -32,12 +32,12 @@ public final class RdbmsConfigurationUtil {
/**
* Get SQL where condition whit primary key.
*
- * @param rdbmsConfiguration rdbms configuration
+ * @param inventoryDumperConfiguration rdbms configuration
* @return SQL where condition
*/
@SuppressWarnings("unchecked")
- public static String getWhereCondition(final RdbmsConfiguration rdbmsConfiguration) {
- return getWhereCondition(rdbmsConfiguration.getPrimaryKey(), rdbmsConfiguration.getPositionManager());
+ public static String getWhereCondition(final InventoryDumperConfiguration inventoryDumperConfiguration) {
+ return getWhereCondition(inventoryDumperConfiguration.getPrimaryKey(), inventoryDumperConfiguration.getPositionManager());
}
private static String getWhereCondition(final String primaryKey, final PositionManager<PrimaryKeyPosition> positionManager) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
index 89b9f6b..3c2d804 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
@@ -22,9 +22,10 @@ import com.google.common.collect.Sets;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
@@ -64,10 +65,10 @@ public final class SyncConfigurationUtil {
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(sourceRule, sourceDatasource.keySet());
filterByShardingDataSourceTables(dataSourceTableNameMap, scalingConfiguration.getJobConfiguration());
for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
- RdbmsConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey()));
- dumperConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
- RdbmsConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
- result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), entry.getValue(), dumperConfiguration, importerConfiguration));
+ DumperConfiguration dumperConfiguration = createDumperConfiguration(entry.getKey(), sourceDatasource.get(entry.getKey()), entry.getValue());
+ ImporterConfiguration importerConfiguration = createImporterConfiguration(scalingConfiguration, sourceRule);
+ importerConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
+ result.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), dumperConfiguration, importerConfiguration));
}
return result;
}
@@ -147,19 +148,20 @@ public final class SyncConfigurationUtil {
}
}
- private static RdbmsConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration) {
- RdbmsConfiguration result = new RdbmsConfiguration();
+ private static DumperConfiguration createDumperConfiguration(final String dataSourceName, final DataSourceConfiguration dataSourceConfiguration, final Map<String, String> tableMap) {
+ DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName(dataSourceName);
Map<String, Object> dataSourceProperties = dataSourceConfiguration.getProps();
JDBCDataSourceConfiguration dumperDataSourceConfiguration = new JDBCDataSourceConfiguration(
dataSourceProperties.containsKey("jdbcUrl") ? dataSourceProperties.get("jdbcUrl").toString() : dataSourceProperties.get("url").toString(),
dataSourceProperties.get("username").toString(), dataSourceProperties.get("password").toString());
result.setDataSourceConfiguration(dumperDataSourceConfiguration);
+ result.setTableNameMap(tableMap);
return result;
}
- private static RdbmsConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) {
- RdbmsConfiguration result = new RdbmsConfiguration();
+ private static ImporterConfiguration createImporterConfiguration(final ScalingConfiguration scalingConfiguration, final ShardingRuleConfiguration shardingRuleConfig) {
+ ImporterConfiguration result = new ImporterConfiguration();
JDBCDataSourceConfiguration importerDataSourceConfiguration = new JDBCDataSourceConfiguration(
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUrl(),
scalingConfiguration.getRuleConfiguration().getDestinationDataSources().getUsername(),
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
deleted file mode 100644
index fc71efc..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/config/RdbmsConfigurationTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.config;
-
-import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionManager;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-import org.junit.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertThat;
-
-public final class RdbmsConfigurationTest {
-
- @Test
- public void assertClone() {
- RdbmsConfiguration origin = new RdbmsConfiguration();
- RdbmsConfiguration clone = RdbmsConfiguration.clone(origin);
- assertThat(clone, is(origin));
- origin.setTableName("t1");
- assertNotSame(origin, clone);
- }
-
- @Test
- public void assertGetWhereCondition() {
- RdbmsConfiguration rdbmsConfiguration = new RdbmsConfiguration();
- assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is(""));
- rdbmsConfiguration.setPrimaryKey("id");
- rdbmsConfiguration.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(0, 10)));
- assertThat(RdbmsConfigurationUtil.getWhereCondition(rdbmsConfiguration), is("WHERE id BETWEEN 0 AND 10"));
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 4ea0f90..0bc8161 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
@@ -85,7 +85,7 @@ public final class AbstractJDBCImporterTest {
@Before
public void setUp() throws SQLException {
- jdbcImporter = new AbstractJDBCImporter(getRdbmsConfiguration(), dataSourceManager) {
+ jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(), dataSourceManager) {
@Override
protected AbstractSqlBuilder createSqlBuilder() {
@@ -158,9 +158,8 @@ public final class AbstractJDBCImporterTest {
return result;
}
- private RdbmsConfiguration getRdbmsConfiguration() {
- RdbmsConfiguration result = new RdbmsConfiguration();
- result.setTableName(TABLE_NAME);
+ private ImporterConfiguration getImporterConfiguration() {
+ ImporterConfiguration result = new ImporterConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
Map<String, Set<String>> shardingColumnsMap = Maps.newHashMap();
shardingColumnsMap.put("test_table", Sets.newHashSet("user"));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2JDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2JDBCDumper.java
index 122c2e8..a29b8cb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2JDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2JDBCDumper.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.scaling.core.fixture;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
-import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -27,8 +27,8 @@ import java.sql.SQLException;
public final class FixtureH2JDBCDumper extends AbstractJDBCDumper {
- public FixtureH2JDBCDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- super(rdbmsConfiguration, dataSourceManager);
+ public FixtureH2JDBCDumper(final InventoryDumperConfiguration dumperConfiguration, final DataSourceManager dataSourceManager) {
+ super(dumperConfiguration, dataSourceManager);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopImporter.java
index 8f76b83..86fd2de 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureNopImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.fixture;
import java.util.List;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -32,7 +32,7 @@ public final class FixtureNopImporter implements Importer {
private Channel channel;
- public FixtureNopImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
+ public FixtureNopImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
index f537ec1..e7c0aee 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumerTest.java
@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.preparer.resumer;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
@@ -83,18 +84,19 @@ public final class SyncPositionResumerTest {
}
private SyncConfiguration mockSyncConfiguration() {
- RdbmsConfiguration dumperConfig = mockDumperConfig();
- RdbmsConfiguration importerConfig = new RdbmsConfiguration();
- Map<String, String> tableMap = new HashMap<>();
- tableMap.put("t_order", "t_order");
- return new SyncConfiguration(3, tableMap, dumperConfig, importerConfig);
+ DumperConfiguration dumperConfig = mockDumperConfig();
+ ImporterConfiguration importerConfig = new ImporterConfiguration();
+ return new SyncConfiguration(3, dumperConfig, importerConfig);
}
- private RdbmsConfiguration mockDumperConfig() {
+ private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
- RdbmsConfiguration result = new RdbmsConfiguration();
+ DumperConfiguration result = new DumperConfiguration();
result.setDataSourceName("ds0");
result.setDataSourceConfiguration(dataSourceConfiguration);
+ Map<String, String> tableMap = new HashMap<>();
+ tableMap.put("t_order", "t_order");
+ result.setTableNameMap(tableMap);
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index e30324f..943e2bd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -18,8 +18,9 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
@@ -56,12 +57,9 @@ public final class InventoryDataTaskSplitterTest {
@Before
public void setUp() {
- RdbmsConfiguration dumperConfig = mockDumperConfig();
- RdbmsConfiguration importerConfig = new RdbmsConfiguration();
- Map<String, String> tableMap = new HashMap<>();
- tableMap.put("t_order", "t_order");
- syncConfiguration = new SyncConfiguration(3, tableMap,
- dumperConfig, importerConfig);
+ DumperConfiguration dumperConfig = mockDumperConfig();
+ ImporterConfiguration importerConfig = new ImporterConfiguration();
+ syncConfiguration = new SyncConfiguration(3, dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
}
@@ -103,7 +101,7 @@ public final class InventoryDataTaskSplitterTest {
assertThat(actual.size(), is(1));
}
- private void initIntPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException {
+ private void initIntPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -113,7 +111,7 @@ public final class InventoryDataTaskSplitterTest {
}
}
- private void initCharPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException {
+ private void initCharPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -123,7 +121,7 @@ public final class InventoryDataTaskSplitterTest {
}
}
- private void initUnionPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException {
+ private void initUnionPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -133,7 +131,7 @@ public final class InventoryDataTaskSplitterTest {
}
}
- private void initNoPrimaryEnvironment(final RdbmsConfiguration dumperConfig) throws SQLException {
+ private void initNoPrimaryEnvironment(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -143,10 +141,13 @@ public final class InventoryDataTaskSplitterTest {
}
}
- private RdbmsConfiguration mockDumperConfig() {
+ private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
- RdbmsConfiguration result = new RdbmsConfiguration();
+ DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
+ Map<String, String> tableMap = new HashMap<>();
+ tableMap.put("t_order", "t_order");
+ result.setTableNameMap(tableMap);
return result;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 1c0189c..2a54c34 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
@@ -54,10 +56,10 @@ public final class InventoryDataScalingTaskTest {
@Before
public void setUp() {
- RdbmsConfiguration dumperConfig = mockDumperConfig();
- RdbmsConfiguration importerConfig = mockImporterConfig();
+ DumperConfiguration dumperConfig = mockDumperConfig();
+ ImporterConfiguration importerConfig = mockImporterConfig();
ScalingContext.getInstance().init(new ServerConfiguration());
- syncConfiguration = new SyncConfiguration(3, Collections.emptyMap(), dumperConfig, importerConfig);
+ syncConfiguration = new SyncConfiguration(3, dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
}
@@ -68,20 +70,26 @@ public final class InventoryDataScalingTaskTest {
@Test(expected = SyncTaskExecuteException.class)
public void assertStartWithGetEstimatedRowsFailure() {
- syncConfiguration.getDumperConfiguration().setTableName("t_non_exist");
- InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(syncConfiguration, dataSourceManager);
+ InventoryDumperConfiguration inventoryDumperConfiguration = new InventoryDumperConfiguration(syncConfiguration.getDumperConfiguration());
+ inventoryDumperConfiguration.setTableName("t_non_exist");
+ InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(
+ inventoryDumperConfiguration, syncConfiguration.getImporterConfiguration(), dataSourceManager);
inventoryDataSyncTask.start();
}
@Test
public void assertGetProgress() throws SQLException {
initTableData(syncConfiguration.getDumperConfiguration());
- InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(syncConfiguration, dataSourceManager);
+ InventoryDumperConfiguration inventoryDumperConfiguration = new InventoryDumperConfiguration(syncConfiguration.getDumperConfiguration());
+ inventoryDumperConfiguration.setTableName("t_order");
+ inventoryDumperConfiguration.setPositionManager(syncConfiguration.getDumperConfiguration().getPositionManager());
+ InventoryDataScalingTask inventoryDataSyncTask = new InventoryDataScalingTask(
+ inventoryDumperConfiguration, syncConfiguration.getImporterConfiguration(), dataSourceManager);
inventoryDataSyncTask.start();
assertThat(((InventoryDataSyncTaskProgress) inventoryDataSyncTask.getProgress()).getEstimatedRows(), is(2L));
}
- private void initTableData(final RdbmsConfiguration dumperConfig) throws SQLException {
+ private void initTableData(final DumperConfiguration dumperConfig) throws SQLException {
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfiguration());
try (Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -91,18 +99,18 @@ public final class InventoryDataScalingTaskTest {
}
}
- private RdbmsConfiguration mockDumperConfig() {
+ private DumperConfiguration mockDumperConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
- RdbmsConfiguration result = new RdbmsConfiguration();
+ DumperConfiguration result = new DumperConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
- result.setTableName("t_order");
result.setPositionManager(new InventoryPositionManager<>(new PrimaryKeyPosition(1, 100)));
+ result.setTableNameMap(Collections.emptyMap());
return result;
}
- private RdbmsConfiguration mockImporterConfig() {
+ private ImporterConfiguration mockImporterConfig() {
DataSourceConfiguration dataSourceConfiguration = new JDBCDataSourceConfiguration(DATA_SOURCE_URL, USERNAME, PASSWORD);
- RdbmsConfiguration result = new RdbmsConfiguration();
+ ImporterConfiguration result = new ImporterConfiguration();
result.setDataSourceConfiguration(dataSourceConfiguration);
return result;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/SyncConfigurationUtilTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/SyncConfigurationUtilTest.java
index f4624b2..d729cb6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/SyncConfigurationUtilTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/SyncConfigurationUtilTest.java
@@ -46,7 +46,7 @@ public final class SyncConfigurationUtilTest {
@Test
public void assertFilterByShardingDataSourceTables() {
List<SyncConfiguration> syncConfigurations = (List<SyncConfiguration>) SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration);
- assertThat(syncConfigurations.get(0).getTableNameMap().size(), is(1));
+ assertThat(syncConfigurations.get(0).getDumperConfiguration().getTableNameMap().size(), is(1));
}
private void initConfig(final String configFile) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
index 62b2e83..718bdbe 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLBinlogDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.mysql;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
@@ -59,7 +59,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
private final BinlogPosition binlogPosition;
- private final RdbmsConfiguration rdbmsConfiguration;
+ private final DumperConfiguration dumperConfiguration;
private final MetaDataManager metaDataManager;
@@ -68,13 +68,13 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
@Setter
private Channel channel;
- public MySQLBinlogDumper(final RdbmsConfiguration rdbmsConfiguration, final Position binlogPosition) {
+ public MySQLBinlogDumper(final DumperConfiguration dumperConfiguration, final Position binlogPosition) {
this.binlogPosition = (BinlogPosition) binlogPosition;
- if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
+ if (!JDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("MySQLBinlogDumper only support JDBCDataSourceConfiguration");
}
- this.rdbmsConfiguration = rdbmsConfiguration;
- metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(rdbmsConfiguration.getDataSourceConfiguration()));
+ this.dumperConfiguration = dumperConfiguration;
+ metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(dumperConfiguration.getDataSourceConfiguration()));
}
@Override
@@ -84,7 +84,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
}
private void dump() {
- JDBCDataSourceConfiguration jdbcDataSourceConfig = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration();
+ JDBCDataSourceConfiguration jdbcDataSourceConfig = (JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration();
JdbcUri uri = new JdbcUri(jdbcDataSourceConfig.getJdbcUrl());
MySQLClient client = new MySQLClient(new ConnectInfo(random.nextInt(), uri.getHostname(), uri.getPort(), jdbcDataSourceConfig.getUsername(), jdbcDataSourceConfig.getPassword()));
client.connect();
@@ -164,7 +164,7 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
private DataRecord createDataRecord(final AbstractRowsEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()), columnCount);
- result.setTableName(rdbmsConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
+ result.setTableName(dumperConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result;
}
@@ -183,6 +183,6 @@ public final class MySQLBinlogDumper extends AbstractShardingScalingExecutor<Bin
}
private boolean filter(final String database, final String schemaName, final String tableName) {
- return !schemaName.equals(database) || !rdbmsConfiguration.getTableNameMap().containsKey(tableName);
+ return !schemaName.equals(database) || !dumperConfiguration.getTableNameMap().containsKey(tableName);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index 521618c..7038b7c 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.mysql;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract
*/
public final class MySQLImporter extends AbstractJDBCImporter {
- public MySQLImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- super(rdbmsConfiguration, dataSourceManager);
+ public MySQLImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
+ super(importerConfiguration, dataSourceManager);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
index ce507ba..5dda1d3 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLJdbcDumper.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.mysql;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
@@ -36,9 +36,9 @@ import java.util.Map.Entry;
*/
public final class MySQLJdbcDumper extends AbstractJDBCDumper {
- public MySQLJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- super(rdbmsConfiguration, dataSourceManager);
- JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) getRdbmsConfiguration().getDataSourceConfiguration();
+ public MySQLJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
+ super(inventoryDumperConfiguration, dataSourceManager);
+ JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) getInventoryDumperConfiguration().getDataSourceConfiguration();
jdbcDataSourceConfiguration.setJdbcUrl(fixMySQLUrl(jdbcDataSourceConfiguration.getJdbcUrl()));
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 5995228..f28c2da 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSqlBuilder;
@@ -27,8 +27,8 @@ import org.apache.shardingsphere.scaling.core.execute.executor.importer.Abstract
*/
public final class PostgreSQLImporter extends AbstractJDBCImporter {
- public PostgreSQLImporter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- super(rdbmsConfiguration, dataSourceManager);
+ public PostgreSQLImporter(final ImporterConfiguration importerConfiguration, final DataSourceManager dataSourceManager) {
+ super(importerConfiguration, dataSourceManager);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
index 99d5f58..a29f0c2 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLJdbcDumper.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.scaling.postgresql;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
-import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
+import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -31,8 +31,8 @@ import java.sql.SQLException;
*/
public final class PostgreSQLJdbcDumper extends AbstractJDBCDumper {
- public PostgreSQLJdbcDumper(final RdbmsConfiguration rdbmsConfiguration, final DataSourceManager dataSourceManager) {
- super(rdbmsConfiguration, dataSourceManager);
+ public PostgreSQLJdbcDumper(final InventoryDumperConfiguration inventoryDumperConfiguration, final DataSourceManager dataSourceManager) {
+ super(inventoryDumperConfiguration, dataSourceManager);
}
@Override
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
index 320a915..09840e1 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLWalDumper.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.scaling.postgresql;
import lombok.Setter;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
@@ -47,7 +47,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private final WalPosition walPosition;
- private final RdbmsConfiguration rdbmsConfiguration;
+ private final DumperConfiguration dumperConfiguration;
private final LogicalReplication logicalReplication = new LogicalReplication();
@@ -56,13 +56,13 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
@Setter
private Channel channel;
- public PostgreSQLWalDumper(final RdbmsConfiguration rdbmsConfiguration, final Position position) {
+ public PostgreSQLWalDumper(final DumperConfiguration dumperConfiguration, final Position position) {
walPosition = (WalPosition) position;
- if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
+ if (!JDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
}
- this.rdbmsConfiguration = rdbmsConfiguration;
- walEventConverter = new WalEventConverter(rdbmsConfiguration);
+ this.dumperConfiguration = dumperConfiguration;
+ walEventConverter = new WalEventConverter(dumperConfiguration);
}
@Override
@@ -73,7 +73,7 @@ public final class PostgreSQLWalDumper extends AbstractShardingScalingExecutor<W
private void dump() {
try {
- PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration());
+ PGConnection pgConnection = logicalReplication.createPgConnection((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(((Connection) pgConnection).unwrap(PgConnection.class).getTimestampUtils());
PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection,
PostgreSQLPositionManager.SLOT_NAME, walPosition.getLogSequenceNumber());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverter.java b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverter.java
index 3097de0..8c62837 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/WalEventConverter.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.scaling.postgresql.wal;
+import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.JDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
@@ -42,13 +42,13 @@ import java.util.List;
*/
public final class WalEventConverter {
- private final RdbmsConfiguration rdbmsConfiguration;
+ private final DumperConfiguration dumperConfiguration;
private final MetaDataManager metaDataManager;
- public WalEventConverter(final RdbmsConfiguration rdbmsConfiguration) {
- this.rdbmsConfiguration = rdbmsConfiguration;
- metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(rdbmsConfiguration.getDataSourceConfiguration()));
+ public WalEventConverter(final DumperConfiguration dumperConfiguration) {
+ this.dumperConfiguration = dumperConfiguration;
+ metaDataManager = new MetaDataManager(new DataSourceFactory().newInstance(dumperConfiguration.getDataSourceConfiguration()));
}
/**
@@ -58,7 +58,7 @@ public final class WalEventConverter {
* @return record
*/
public Record convert(final AbstractWalEvent event) {
- JdbcUri uri = new JdbcUri(((JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration()).getJdbcUrl());
+ JdbcUri uri = new JdbcUri(((JDBCDataSourceConfiguration) dumperConfiguration.getDataSourceConfiguration()).getJdbcUrl());
if (filter(uri.getDatabase(), event)) {
return createPlaceholderRecord(event);
} else if (event instanceof WriteRowEvent) {
@@ -76,7 +76,7 @@ public final class WalEventConverter {
private boolean filter(final String database, final AbstractWalEvent event) {
if (isRowEvent(event)) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
- return !rowEvent.getSchemaName().equals(database) || !rdbmsConfiguration.getTableNameMap().containsKey(rowEvent.getTableName());
+ return !rowEvent.getSchemaName().equals(database) || !dumperConfiguration.getTableNameMap().containsKey(rowEvent.getTableName());
}
return false;
}
@@ -118,7 +118,7 @@ public final class WalEventConverter {
private DataRecord createDataRecord(final AbstractRowEvent rowsEvent, final int columnCount) {
DataRecord result = new DataRecord(new WalPosition(rowsEvent.getLogSequenceNumber()), columnCount);
- result.setTableName(rdbmsConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
+ result.setTableName(dumperConfiguration.getTableNameMap().get(rowsEvent.getTableName()));
return result;
}