You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/02/02 08:40:26 UTC
[shardingsphere] branch master updated: Optimize some utils. (#9263)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a46d7a3 Optimize some utils. (#9263)
a46d7a3 is described below
commit a46d7a3a86d6fb90c700bcb8fb1f88c4212d6621
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Feb 2 16:39:54 2021 +0800
Optimize some utils. (#9263)
* rename package utils to util
* remove useless function
* Optimize some utils.
Co-authored-by: qiulu3 <Lucas209910>
---
...ava => ServerConfigurationInitializerTest.java} | 2 +-
.../scaling/web/HttpServerHandlerTest.java | 2 +-
.../scaling/web/HttpServerInitializerTest.java | 2 +-
.../scaling/core/api/ScalingAPIFactory.java | 42 ++++
.../scaling/core/api/impl/ScalingAPIImpl.java | 2 +-
.../execute/executor/channel/MemoryChannel.java | 2 +-
.../channel/bitset/BlockingQueueChannel.java | 2 +-
.../executor/dumper/AbstractJDBCDumper.java | 11 +-
.../executor/importer/AbstractJDBCImporter.java | 2 +-
.../executor/job/FinishedCheckJobExecutor.java | 4 +-
.../execute/executor/job/ScalingJobExecutor.java | 3 +-
.../scaling/core/job/FinishedCheckJob.java | 4 +-
.../scaling/core/job/JobContext.java | 5 +-
.../job/position/PositionInitializerFactory.java | 2 +-
.../core/schedule/ScalingTaskScheduler.java | 2 +-
.../scaling/core/{utils => util}/JDBCUtil.java | 6 +-
.../JobConfigurationUtil.java} | 269 ++++++++++++---------
.../core/{utils => util}/ReflectionUtil.java | 57 +----
.../core/{utils => util}/ScalingTaskUtil.java | 41 ++--
.../scaling/core/{utils => util}/ThreadUtil.java | 2 +-
.../scaling/core/utils/ElasticJobUtil.java | 64 -----
.../scaling/core/utils/JobConfigurationUtil.java | 181 --------------
.../scaling/core/utils/RdbmsConfigurationUtil.java | 49 ----
.../scaling/core/utils/ShardingColumnsUtil.java | 41 ----
.../core/datasource/DataSourceManagerTest.java | 8 +-
.../executor/channel/bitset/ManualBitSetTest.java | 2 +-
.../check/AbstractDataConsistencyCheckerTest.java | 4 +-
.../splitter/InventoryTaskSplitterTest.java | 5 +-
.../scaling/core/util/JDBCUtilTest.java | 1 -
.../scaling/core/util/JobConfigurationUtil.java | 60 -----
.../scaling/mysql/component/MySQLImporter.java | 2 +-
.../scaling/mysql/component/MySQLJdbcDumper.java | 2 +-
.../mysql/component/MySQLScalingSQLBuilder.java | 10 +-
.../scaling/mysql/client/MySQLClientTest.java | 2 +-
.../netty/MySQLBinlogEventPacketDecoderTest.java | 2 +-
.../netty/MySQLCommandPacketDecoderTest.java | 2 +-
.../client/netty/MySQLNegotiateHandlerTest.java | 2 +-
.../mysql/component/MySQLBinlogDumperTest.java | 2 +-
.../postgresql/component/PostgreSQLWalDumper.java | 2 +-
.../component/PostgreSQLWalDumperTest.java | 2 +-
40 files changed, 275 insertions(+), 630 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationUtilTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializerTest.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationUtilTest.java
rename to shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializerTest.java
index 31930b5..94071cc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationUtilTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/util/ServerConfigurationInitializerTest.java
@@ -22,7 +22,7 @@ import org.junit.Test;
import static org.junit.Assert.assertNotNull;
-public final class ServerConfigurationUtilTest {
+public final class ServerConfigurationInitializerTest {
@Test
public void assertInitScalingConfig() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
index 46561e5..885f3e9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerHandlerTest.java
@@ -36,7 +36,7 @@ import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.util.ServerConfigurationInitializer;
import org.apache.shardingsphere.scaling.web.entity.ResponseContent;
import org.junit.Before;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
index 0103f54..20359b1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/web/HttpServerInitializerTest.java
@@ -23,7 +23,7 @@ import io.netty.channel.socket.SocketChannel;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
index 61f56c3..913c1d4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPIFactory.java
@@ -18,10 +18,14 @@
package org.apache.shardingsphere.scaling.core.api;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
@@ -34,6 +38,8 @@ import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import java.util.Properties;
+
/**
* Scaling API factory.
*/
@@ -81,6 +87,15 @@ public final class ScalingAPIFactory {
return ElasticJobAPIHolder.INSTANCE.getJobConfigurationAPI();
}
+ /**
+ * Get registry center.
+ *
+ * @return Coordinator registry center
+ */
+ public static CoordinatorRegistryCenter getRegistryCenter() {
+ return RegistryCenterHolder.INSTANCE;
+ }
+
private static final class ScalingAPIHolder {
private static final ScalingAPI INSTANCE = new ScalingAPIImpl();
@@ -117,4 +132,31 @@ public final class ScalingAPIFactory {
jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
}
}
+
+ private static final class RegistryCenterHolder {
+
+ private static final CoordinatorRegistryCenter INSTANCE = createRegistryCenter();
+
+ private static CoordinatorRegistryCenter createRegistryCenter() {
+ CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfig());
+ result.init();
+ return result;
+ }
+
+ private static ZookeeperConfiguration getZookeeperConfig() {
+ GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
+ ZookeeperConfiguration result = new ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
+ governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
+ Properties props = governanceConfig.getRegistryCenterConfiguration().getProps();
+ result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
+ result.setBaseSleepTimeMilliseconds(getProperty(props, "base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
+ result.setConnectionTimeoutMilliseconds(getProperty(props, "connection.timeout.milliseconds", result.getConnectionTimeoutMilliseconds()));
+ result.setSessionTimeoutMilliseconds(getProperty(props, "session.timeout.milliseconds", result.getSessionTimeoutMilliseconds()));
+ return result;
+ }
+
+ private static int getProperty(final Properties props, final String key, final int defaultValue) {
+ return Strings.isNullOrEmpty(props.getProperty(key)) ? defaultValue : Integer.parseInt(props.getProperty(key));
+ }
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index 203fcc0..581fd66 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -36,7 +36,7 @@ import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFa
import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
-import org.apache.shardingsphere.scaling.core.utils.JobConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import java.sql.SQLException;
import java.util.HashMap;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
index bc1185b..088761d 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/MemoryChannel.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.channel;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import java.util.ArrayList;
import java.util.LinkedList;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/BlockingQueueChannel.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/BlockingQueueChannel.java
index a99cb43..026c7b9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/BlockingQueueChannel.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/BlockingQueueChannel.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.channel.bitset;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import java.util.ArrayList;
import java.util.List;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index 61a8648..622f32e 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -38,7 +38,6 @@ import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.Position;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.metadata.MetaDataManager;
-import org.apache.shardingsphere.scaling.core.utils.RdbmsConfigurationUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -84,7 +83,7 @@ public abstract class AbstractJDBCDumper extends AbstractScalingExecutor impleme
private void dump() {
try (Connection conn = dataSourceManager.getDataSource(inventoryDumperConfig.getDataSourceConfig()).getConnection()) {
- String sql = String.format("SELECT * FROM %s %s", inventoryDumperConfig.getTableName(), RdbmsConfigurationUtil.getWhereCondition(inventoryDumperConfig));
+ String sql = String.format("SELECT * FROM %s %s", inventoryDumperConfig.getTableName(), getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPosition()));
PreparedStatement ps = createPreparedStatement(conn, sql);
ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
@@ -107,6 +106,14 @@ public abstract class AbstractJDBCDumper extends AbstractScalingExecutor impleme
}
}
+ private String getWhereCondition(final String primaryKey, final Position<?> position) {
+ if (null == primaryKey || null == position) {
+ return "";
+ }
+ PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
+ return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
+ }
+
private Position<?> newPosition(final ResultSet rs) throws SQLException {
if (null == inventoryDumperConfig.getPrimaryKey()) {
return new PlaceholderPosition();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index c377c1c..018c2dc 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -33,7 +33,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDat
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
-import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import javax.sql.DataSource;
import java.sql.Connection;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
index 0da5290..ff66687 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/FinishedCheckJobExecutor.java
@@ -20,10 +20,10 @@ package org.apache.shardingsphere.scaling.core.execute.executor.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
+import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.FinishedCheckJob;
-import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
/**
* Finished check worker.
@@ -39,7 +39,7 @@ public final class FinishedCheckJobExecutor extends AbstractScalingExecutor impl
public void start() {
super.start();
log.info("start finished check worker.");
- new ScheduleJobBootstrap(ElasticJobUtil.createRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
+ new ScheduleJobBootstrap(ScalingAPIFactory.getRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
}
private JobConfiguration createJobConfig() {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
index 65bf861..b6c8344 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/job/ScalingJobExecutor.java
@@ -29,7 +29,6 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.execute.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.ScalingExecutor;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.utils.ElasticJobUtil;
import java.util.Optional;
import java.util.Set;
@@ -91,7 +90,7 @@ public final class ScalingJobExecutor extends AbstractScalingExecutor implements
private void execute(final JobConfigurationPOJO jobConfigPOJO) {
if (EXECUTING_JOBS.add(jobConfigPOJO.getJobName())) {
- new OneOffJobBootstrap(ElasticJobUtil.createRegistryCenter(), new ScalingJob(), jobConfigPOJO.toJobConfiguration()).execute();
+ new OneOffJobBootstrap(ScalingAPIFactory.getRegistryCenter(), new ScalingJob(), jobConfigPOJO.toJobConfiguration()).execute();
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 6aa517a..5ea1dad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
-import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import java.util.Map;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
index 0dd19bd..7cd36bc 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobContext.java
@@ -25,8 +25,7 @@ import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
-import org.apache.shardingsphere.scaling.core.utils.JobConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.utils.TaskConfigurationUtil;
+import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import java.util.LinkedList;
import java.util.List;
@@ -59,6 +58,6 @@ public final class JobContext {
JobConfigurationUtil.fillInProperties(jobConfig);
jobId = jobConfig.getHandleConfig().getJobId();
shardingItem = jobConfig.getHandleConfig().getShardingItem();
- taskConfigs = TaskConfigurationUtil.toTaskConfigs(jobConfig);
+ taskConfigs = JobConfigurationUtil.toTaskConfigs(jobConfig);
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
index 33717c3..e368bc4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/position/PositionInitializerFactory.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.scaling.core.job.position;
import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
/**
* Position initializer factory.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 5ce72a2..83f8241 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTa
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
import java.util.Collection;
import java.util.stream.Collectors;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JDBCUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JDBCUtil.java
similarity index 96%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JDBCUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JDBCUtil.java
index 92f8762..c1e5d8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JDBCUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JDBCUtil.java
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.utils;
+package org.apache.shardingsphere.scaling.core.util;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
@@ -29,8 +31,8 @@ import java.util.Map;
/**
* JDBC util.
- *
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JDBCUtil {
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
similarity index 83%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
index 74a3951..a7dee2a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/TaskConfigurationUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
@@ -15,15 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.utils;
+package org.apache.shardingsphere.scaling.core.util;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.gson.Gson;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
@@ -36,6 +39,7 @@ import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourc
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
@@ -46,6 +50,9 @@ import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
import javax.sql.DataSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -54,17 +61,148 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
- * Task configuration Util.
+ * Job configuration util.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class TaskConfigurationUtil {
+public final class JobConfigurationUtil {
+
+ private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR = initIdAutoIncreaseGenerator();
+
+ private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
+ SnowflakeKeyGenerateAlgorithm result = new SnowflakeKeyGenerateAlgorithm();
+ result.init();
+ return result;
+ }
+
+ private static Long generateKey() {
+ return (Long) ID_AUTO_INCREASE_GENERATOR.generateKey();
+ }
+
+ /**
+ * Fill in properties for job configuration.
+ *
+ * @param jobConfig job configuration
+ */
+ public static void fillInProperties(final JobConfiguration jobConfig) {
+ HandleConfiguration handleConfig = jobConfig.getHandleConfig();
+ if (null == handleConfig.getJobId()) {
+ handleConfig.setJobId(generateKey());
+ }
+ if (Strings.isNullOrEmpty(handleConfig.getDatabaseType())) {
+ handleConfig.setDatabaseType(jobConfig.getRuleConfig().getSource().unwrap().getDatabaseType().getName());
+ }
+ if (null == jobConfig.getHandleConfig().getShardingTables()) {
+ handleConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
+ }
+ }
+
+ private static List<String> getShouldScalingActualDataNodes(final JobConfiguration jobConfig) {
+ ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
+ Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
+ "Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
+ ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
+ if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
+ return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+ }
+ ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
+ return getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSource(), target.getDataSource()),
+ getShardingRuleConfigMap(source.getRule()), getShardingRuleConfigMap(target.getRule()));
+ }
+
+ private static List<String> getShouldScalingActualDataNodes(final Set<String> modifiedDataSources,
+ final Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
+ final Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
+ List<String> result = new ArrayList<>();
+ newShardingRuleConfigMap.keySet().forEach(each -> {
+ if (!oldShardingRuleConfigMap.containsKey(each)) {
+ return;
+ }
+ List<String> oldActualDataNodes = new InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+ List<String> newActualDataNodes = new InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
+ if (!CollectionUtils.isEqualCollection(oldActualDataNodes, newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, modifiedDataSources)) {
+ result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
+ }
+ });
+ return result;
+ }
+
+ private static Set<String> getModifiedDataSources(final String oldConfig, final String newConfig) {
+ Set<String> result = new HashSet<>();
+ Map<String, String> oldDataSourceUrlMap = getDataSourceUrlMap(oldConfig);
+ Map<String, String> newDataSourceUrlMap = getDataSourceUrlMap(newConfig);
+ newDataSourceUrlMap.forEach((key, value) -> {
+ if (!value.equals(oldDataSourceUrlMap.get(key))) {
+ result.add(key);
+ }
+ });
+ return result;
+ }
+
+ private static Map<String, String> getDataSourceUrlMap(final String configuration) {
+ Map<String, String> result = new HashMap<>();
+ ConfigurationYamlConverter.loadDataSourceConfigs(configuration).forEach((key, value) -> {
+ JdbcUri uri = new JdbcUri(value.getProps().getOrDefault("url", value.getProps().get("jdbcUrl")).toString());
+ result.put(key, String.format("%s/%s", uri.getHost(), uri.getDatabase()));
+ });
+ return result;
+ }
+
+ private static boolean includeModifiedDataSources(final List<String> actualDataNodes, final Set<String> modifiedDataSources) {
+ return actualDataNodes.stream().anyMatch(each -> modifiedDataSources.contains(each.split("\\.")[0]));
+ }
+
+ private static Map<String, ShardingTableRuleConfiguration> getShardingRuleConfigMap(final String configuration) {
+ ShardingRuleConfiguration oldShardingRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(configuration);
+ return oldShardingRuleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable, Function.identity()));
+ }
+
+ private static String[] groupByDataSource(final List<String> actualDataNodeList) {
+ List<String> result = new ArrayList<>();
+ Multimap<String, String> multiMap = getNodeMultiMap(actualDataNodeList);
+ for (String key : multiMap.keySet()) {
+ List<String> list = new ArrayList<>();
+ for (String value : multiMap.get(key)) {
+ list.add(String.format("%s.%s", key, value));
+ }
+ result.add(String.join(",", list));
+ }
+ return result.toArray(new String[0]);
+ }
+
+ private static Multimap<String, String> getNodeMultiMap(final List<String> actualDataNodeList) {
+ Multimap<String, String> result = HashMultimap.create();
+ for (String actualDataNodes : actualDataNodeList) {
+ for (String actualDataNode : actualDataNodes.split(",")) {
+ String[] nodeArray = split(actualDataNode);
+ for (String dataSource : new InlineExpressionParser(nodeArray[0]).splitAndEvaluate()) {
+ result.put(dataSource, nodeArray[1]);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static String[] split(final String actualDataNode) {
+ boolean flag = true;
+ int i = 0;
+ for (; i < actualDataNode.length(); i++) {
+ char each = actualDataNode.charAt(i);
+ if (each == '{') {
+ flag = false;
+ } else if (each == '}') {
+ flag = true;
+ } else if (flag && each == '.') {
+ break;
+ }
+ }
+ return new String[]{actualDataNode.substring(0, i), actualDataNode.substring(i + 1)};
+ }
/**
* Split job configuration to task configurations.
@@ -77,12 +215,12 @@ public final class TaskConfigurationUtil {
ShardingSphereJDBCDataSourceConfiguration sourceConfig = getSourceConfig(jobConfig);
ShardingRuleConfiguration sourceRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(sourceConfig.getRule());
Map<String, DataSourceConfiguration> sourceDataSource = ConfigurationYamlConverter.loadDataSourceConfigs(sourceConfig.getDataSource());
- Map<String, DataSource> dataSourceMap = sourceDataSource.entrySet().stream().collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().createDataSource()));
+ Map<String, DataSource> dataSourceMap = sourceDataSource.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().createDataSource()));
Map<String, Map<String, String>> dataSourceTableNameMap = toDataSourceTableNameMap(new ShardingRule(sourceRuleConfig, sourceConfig.getDatabaseType(), dataSourceMap));
Optional<ShardingRuleConfiguration> targetRuleConfig = getTargetRuleConfig(jobConfig);
filterByShardingDataSourceTables(dataSourceTableNameMap, jobConfig.getHandleConfig());
Map<String, Set<String>> shardingColumnsMap = getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig));
- for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
DumperConfiguration dumperConfig = createDumperConfig(entry.getKey(), sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
ImporterConfiguration importerConfig = createImporterConfig(jobConfig, shardingColumnsMap);
result.add(new TaskConfiguration(jobConfig.getHandleConfig(), dumperConfig, importerConfig));
@@ -110,7 +248,7 @@ public final class TaskConfigurationUtil {
}
Map<String, Set<String>> shardingDataSourceTableMap = toDataSourceTableNameMap(getShardingDataSourceTables(handleConfig));
dataSourceTableNameMap.entrySet().removeIf(entry -> !shardingDataSourceTableMap.containsKey(entry.getKey()));
- for (Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : dataSourceTableNameMap.entrySet()) {
filterByShardingTables(entry.getValue(), shardingDataSourceTableMap.get(entry.getKey()));
}
}
@@ -148,7 +286,7 @@ public final class TaskConfigurationUtil {
private static Map<String, Map<String, String>> toDataSourceTableNameMap(final TableRule tableRule) {
Map<String, Map<String, String>> result = new HashMap<>();
- for (Entry<String, Collection<String>> entry : tableRule.getDatasourceToTablesMap().entrySet()) {
+ for (Map.Entry<String, Collection<String>> entry : tableRule.getDatasourceToTablesMap().entrySet()) {
Map<String, String> tableNameMap = result.get(entry.getKey());
if (null == tableNameMap) {
result.put(entry.getKey(), toTableNameMap(tableRule.getLogicTable(), entry.getValue()));
@@ -168,7 +306,7 @@ public final class TaskConfigurationUtil {
}
private static void mergeDataSourceTableNameMap(final Map<String, Map<String, String>> mergedResult, final Map<String, Map<String, String>> newDataSourceTableNameMap) {
- for (Entry<String, Map<String, String>> entry : newDataSourceTableNameMap.entrySet()) {
+ for (Map.Entry<String, Map<String, String>> entry : newDataSourceTableNameMap.entrySet()) {
Map<String, String> tableNameMap = mergedResult.get(entry.getKey());
if (null == tableNameMap) {
mergedResult.put(entry.getKey(), entry.getValue());
@@ -220,113 +358,16 @@ public final class TaskConfigurationUtil {
}
/**
- * Fill in sharding tables.
+ * Init job config.
*
- * @param jobConfig job configuration
+ * @param configFile config file
+ * @return job configuration
*/
- public static void fillInShardingTables(final JobConfiguration jobConfig) {
- if (null != jobConfig.getHandleConfig().getShardingTables()) {
- return;
- }
- jobConfig.getHandleConfig().setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
- }
-
- private static List<String> getShouldScalingActualDataNodes(final JobConfiguration jobConfig) {
- ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
- Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
- "Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
- if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
- return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
+ @SneakyThrows(IOException.class)
+ public static JobConfiguration initJobConfig(final String configFile) {
+ try (InputStream fileInputStream = JobConfigurationUtil.class.getResourceAsStream(configFile);
+ InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)) {
+ return new Gson().fromJson(inputStreamReader, JobConfiguration.class);
}
- ShardingSphereJDBCDataSourceConfiguration target =
- (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
- Set<String> modifiedDataSources = getModifiedDataSources(source.getDataSource(), target.getDataSource());
- Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap = getShardingRuleConfigMap(source.getRule());
- Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap = getShardingRuleConfigMap(target.getRule());
- List<String> result = new ArrayList<>();
- newShardingRuleConfigMap.keySet().forEach(each -> {
- if (!oldShardingRuleConfigMap.containsKey(each)) {
- return;
- }
- List<String> oldActualDataNodes = new InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
- List<String> newActualDataNodes = new InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
- if (!CollectionUtils.isEqualCollection(oldActualDataNodes, newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, modifiedDataSources)) {
- result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
- }
- });
- return result;
- }
-
- private static Set<String> getModifiedDataSources(final String oldConfig, final String newConfig) {
- Set<String> result = new HashSet<>();
- Map<String, String> oldDataSourceUrlMap = getDataSourceUrlMap(oldConfig);
- Map<String, String> newDataSourceUrlMap = getDataSourceUrlMap(newConfig);
- newDataSourceUrlMap.forEach((key, value) -> {
- if (!value.equals(oldDataSourceUrlMap.get(key))) {
- result.add(key);
- }
- });
- return result;
- }
-
- private static Map<String, String> getDataSourceUrlMap(final String configuration) {
- Map<String, String> result = new HashMap<>();
- ConfigurationYamlConverter.loadDataSourceConfigs(configuration).forEach((key, value) -> {
- JdbcUri uri = new JdbcUri(value.getProps().getOrDefault("url", value.getProps().get("jdbcUrl")).toString());
- result.put(key, String.format("%s/%s", uri.getHost(), uri.getDatabase()));
- });
- return result;
- }
-
- private static boolean includeModifiedDataSources(final List<String> actualDataNodes, final Set<String> modifiedDataSources) {
- return actualDataNodes.stream().anyMatch(each -> modifiedDataSources.contains(each.split("\\.")[0]));
- }
-
- private static Map<String, ShardingTableRuleConfiguration> getShardingRuleConfigMap(final String configuration) {
- ShardingRuleConfiguration oldShardingRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(configuration);
- return oldShardingRuleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable, Function.identity()));
- }
-
- private static String[] groupByDataSource(final List<String> actualDataNodeList) {
- List<String> result = new ArrayList<>();
- Multimap<String, String> multiMap = getNodeMultiMap(actualDataNodeList);
- for (String key : multiMap.keySet()) {
- List<String> list = new ArrayList<>();
- for (String value : multiMap.get(key)) {
- list.add(String.format("%s.%s", key, value));
- }
- result.add(String.join(",", list));
- }
- return result.toArray(new String[0]);
- }
-
- private static Multimap<String, String> getNodeMultiMap(final List<String> actualDataNodeList) {
- Multimap<String, String> result = HashMultimap.create();
- for (String actualDataNodes : actualDataNodeList) {
- for (String actualDataNode : actualDataNodes.split(",")) {
- String[] nodeArray = split(actualDataNode);
- for (String dataSource : new InlineExpressionParser(nodeArray[0]).splitAndEvaluate()) {
- result.put(dataSource, nodeArray[1]);
- }
- }
- }
- return result;
- }
-
- private static String[] split(final String actualDataNode) {
- boolean flag = true;
- int i = 0;
- for (; i < actualDataNode.length(); i++) {
- char each = actualDataNode.charAt(i);
- if (each == '{') {
- flag = false;
- } else if (each == '}') {
- flag = true;
- } else if (flag && each == '.') {
- break;
- }
- }
- return new String[]{actualDataNode.substring(0, i), actualDataNode.substring(i + 1)};
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
similarity index 69%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
index 5c9b954..742e5f4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ReflectionUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ReflectionUtil.java
@@ -15,43 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.utils;
+package org.apache.shardingsphere.scaling.core.util;
import com.google.common.base.Preconditions;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.Map;
/**
* Reflection utils.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ReflectionUtil {
/**
- * Get field map.
- *
- * @param object object
- * @return field map
- * @throws IllegalAccessException illegal access exception
- */
- public static Map<String, Object> getFieldMap(final Object object) throws IllegalAccessException {
- Map<String, Object> result = new HashMap<>();
- for (Field field : object.getClass().getDeclaredFields()) {
- field.setAccessible(true);
- Object value = field.get(object);
- if (null != value) {
- result.put(field.getName(), value);
- }
- }
- return result;
- }
-
- /**
* Set value into target object field.
*
* @param target target object
@@ -66,37 +48,6 @@ public final class ReflectionUtil {
}
/**
- * Set field value into target object.
- *
- * @param targetClass target class
- * @param targetObject target object
- * @param fieldName field name
- * @param value target filed value
- * @throws NoSuchFieldException no such field exception
- * @throws IllegalAccessException illegal access exception
- */
- public static void setFieldValue(final Class<?> targetClass, final Object targetObject, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
- Field field = getField(targetClass, fieldName, true);
- field.setAccessible(true);
- field.set(targetObject, value);
- }
-
- /**
- * Set static field value.
- *
- * @param targetClass target class
- * @param fieldName field name
- * @param value new value
- * @throws NoSuchFieldException no such field exception
- * @throws IllegalAccessException illegal access exception
- */
- public static void setStaticFieldValue(final Class<?> targetClass, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
- Field field = getField(targetClass, fieldName, true);
- field.setAccessible(true);
- field.set(null, value);
- }
-
- /**
* Get field value from instance target object.
*
* @param target target object
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
similarity index 91%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
index 19760f5..5a23923 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ScalingTaskUtil.java
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.utils;
+package org.apache.shardingsphere.scaling.core.util;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.position.JobProgress;
@@ -29,25 +31,10 @@ import java.util.Objects;
/**
* Scaling task util.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ScalingTaskUtil {
/**
- * All inventory tasks is finished.
- *
- * @param inventoryTasks to check inventory tasks
- * @return is finished
- */
- public static boolean allInventoryTasksFinished(final List<InventoryTask> inventoryTasks) {
- return inventoryTasks.stream().allMatch(each -> each.getProgress().getPosition() instanceof FinishedPosition);
- }
-
- private static boolean allInventoryTasksFinished(final Map<Integer, JobProgress> jobProgress) {
- return jobProgress.values().stream()
- .flatMap(each -> each.getInventoryTaskProgressMap().values().stream())
- .allMatch(each -> each.getPosition() instanceof FinishedPosition);
- }
-
- /**
* All inventory tasks is finished and all incremental tasks is almost finished.
*
* @param jobProgressMap job progress map
@@ -55,12 +42,12 @@ public final class ScalingTaskUtil {
* @return almost finished or not
*/
public static boolean almostFinished(final Map<Integer, JobProgress> jobProgressMap, final HandleConfiguration handleConfig) {
- return isCompletedProgress(jobProgressMap, handleConfig)
+ return isProgressCompleted(jobProgressMap, handleConfig)
&& allInventoryTasksFinished(jobProgressMap)
&& allIncrementalTasksAlmostFinished(jobProgressMap, handleConfig);
}
- private static boolean isCompletedProgress(final Map<Integer, JobProgress> jobProgressMap, final HandleConfiguration handleConfig) {
+ private static boolean isProgressCompleted(final Map<Integer, JobProgress> jobProgressMap, final HandleConfiguration handleConfig) {
return handleConfig.getShardingTotalCount() == jobProgressMap.size()
&& jobProgressMap.values().stream().allMatch(Objects::nonNull);
}
@@ -70,4 +57,20 @@ public final class ScalingTaskUtil {
.flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
.allMatch(each -> each.getIncrementalTaskDelay().getDelayMilliseconds() <= handleConfig.getWorkflowConfig().getAllowDelayMilliseconds());
}
+
+ /**
+ * All inventory tasks is finished.
+ *
+ * @param inventoryTasks to check inventory tasks
+ * @return is finished
+ */
+ public static boolean allInventoryTasksFinished(final List<InventoryTask> inventoryTasks) {
+ return inventoryTasks.stream().allMatch(each -> each.getProgress().getPosition() instanceof FinishedPosition);
+ }
+
+ private static boolean allInventoryTasksFinished(final Map<Integer, JobProgress> jobProgress) {
+ return jobProgress.values().stream()
+ .flatMap(each -> each.getInventoryTaskProgressMap().values().stream())
+ .allMatch(each -> each.getPosition() instanceof FinishedPosition);
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ThreadUtil.java
similarity index 95%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ThreadUtil.java
index fb3ad48..3b6801f 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ThreadUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/util/ThreadUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.utils;
+package org.apache.shardingsphere.scaling.core.util;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
deleted file mode 100644
index bc35fc2..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ElasticJobUtil.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import com.google.common.base.Strings;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
-import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
-import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
-
-import java.util.Properties;
-
-/**
- * Elastic job util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ElasticJobUtil {
-
- /**
- * Create registry center.
- *
- * @return coordinator registry center
- */
- public static CoordinatorRegistryCenter createRegistryCenter() {
- CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfig());
- result.init();
- return result;
- }
-
- private static ZookeeperConfiguration getZookeeperConfig() {
- GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
- ZookeeperConfiguration result = new ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
- governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
- Properties props = governanceConfig.getRegistryCenterConfiguration().getProps();
- result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
- result.setBaseSleepTimeMilliseconds(getProperty(props, "base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
- result.setConnectionTimeoutMilliseconds(getProperty(props, "connection.timeout.milliseconds", result.getConnectionTimeoutMilliseconds()));
- result.setSessionTimeoutMilliseconds(getProperty(props, "session.timeout.milliseconds", result.getSessionTimeoutMilliseconds()));
- return result;
- }
-
- private static int getProperty(final Properties props, final String key, final int defaultValue) {
- return Strings.isNullOrEmpty(props.getProperty(key)) ? defaultValue : Integer.parseInt(props.getProperty(key));
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java
deleted file mode 100644
index e4a3ff3..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/JobConfigurationUtil.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.shardingsphere.scaling.core.config.HandleConfiguration;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
-import org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
-import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.algorithm.sharding.inline.InlineExpressionParser;
-import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/**
- * Job configuration util.
- */
-public final class JobConfigurationUtil {
-
- private static final SnowflakeKeyGenerateAlgorithm ID_AUTO_INCREASE_GENERATOR = initIdAutoIncreaseGenerator();
-
- private static SnowflakeKeyGenerateAlgorithm initIdAutoIncreaseGenerator() {
- SnowflakeKeyGenerateAlgorithm result = new SnowflakeKeyGenerateAlgorithm();
- result.init();
- return result;
- }
-
- private static Long generateKey() {
- return (Long) ID_AUTO_INCREASE_GENERATOR.generateKey();
- }
-
- /**
- * Fill in properties for job configuration.
- *
- * @param jobConfig job configuration
- */
- public static void fillInProperties(final JobConfiguration jobConfig) {
- HandleConfiguration handleConfig = jobConfig.getHandleConfig();
- if (null == handleConfig.getJobId()) {
- handleConfig.setJobId(generateKey());
- }
- if (Strings.isNullOrEmpty(handleConfig.getDatabaseType())) {
- handleConfig.setDatabaseType(jobConfig.getRuleConfig().getSource().unwrap().getDatabaseType().getName());
- }
- if (null == jobConfig.getHandleConfig().getShardingTables()) {
- handleConfig.setShardingTables(groupByDataSource(getShouldScalingActualDataNodes(jobConfig)));
- }
- }
-
- private static List<String> getShouldScalingActualDataNodes(final JobConfiguration jobConfig) {
- ScalingDataSourceConfiguration sourceConfig = jobConfig.getRuleConfig().getSource().unwrap();
- Preconditions.checkState(sourceConfig instanceof ShardingSphereJDBCDataSourceConfiguration,
- "Only ShardingSphereJdbc type of source ScalingDataSourceConfiguration is supported.");
- ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) sourceConfig;
- if (!(jobConfig.getRuleConfig().getTarget().unwrap() instanceof ShardingSphereJDBCDataSourceConfiguration)) {
- return getShardingRuleConfigMap(source.getRule()).values().stream().map(ShardingTableRuleConfiguration::getActualDataNodes).collect(Collectors.toList());
- }
- ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) jobConfig.getRuleConfig().getTarget().unwrap();
- return getShouldScalingActualDataNodes(getModifiedDataSources(source.getDataSource(), target.getDataSource()),
- getShardingRuleConfigMap(source.getRule()), getShardingRuleConfigMap(target.getRule()));
- }
-
- private static List<String> getShouldScalingActualDataNodes(final Set<String> modifiedDataSources,
- final Map<String, ShardingTableRuleConfiguration> oldShardingRuleConfigMap,
- final Map<String, ShardingTableRuleConfiguration> newShardingRuleConfigMap) {
- List<String> result = new ArrayList<>();
- newShardingRuleConfigMap.keySet().forEach(each -> {
- if (!oldShardingRuleConfigMap.containsKey(each)) {
- return;
- }
- List<String> oldActualDataNodes = new InlineExpressionParser(oldShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
- List<String> newActualDataNodes = new InlineExpressionParser(newShardingRuleConfigMap.get(each).getActualDataNodes()).splitAndEvaluate();
- if (!CollectionUtils.isEqualCollection(oldActualDataNodes, newActualDataNodes) || includeModifiedDataSources(newActualDataNodes, modifiedDataSources)) {
- result.add(oldShardingRuleConfigMap.get(each).getActualDataNodes());
- }
- });
- return result;
- }
-
- private static Set<String> getModifiedDataSources(final String oldConfig, final String newConfig) {
- Set<String> result = new HashSet<>();
- Map<String, String> oldDataSourceUrlMap = getDataSourceUrlMap(oldConfig);
- Map<String, String> newDataSourceUrlMap = getDataSourceUrlMap(newConfig);
- newDataSourceUrlMap.forEach((key, value) -> {
- if (!value.equals(oldDataSourceUrlMap.get(key))) {
- result.add(key);
- }
- });
- return result;
- }
-
- private static Map<String, String> getDataSourceUrlMap(final String configuration) {
- Map<String, String> result = new HashMap<>();
- ConfigurationYamlConverter.loadDataSourceConfigs(configuration).forEach((key, value) -> {
- JdbcUri uri = new JdbcUri(value.getProps().getOrDefault("url", value.getProps().get("jdbcUrl")).toString());
- result.put(key, String.format("%s/%s", uri.getHost(), uri.getDatabase()));
- });
- return result;
- }
-
- private static boolean includeModifiedDataSources(final List<String> actualDataNodes, final Set<String> modifiedDataSources) {
- return actualDataNodes.stream().anyMatch(each -> modifiedDataSources.contains(each.split("\\.")[0]));
- }
-
- private static Map<String, ShardingTableRuleConfiguration> getShardingRuleConfigMap(final String configuration) {
- ShardingRuleConfiguration oldShardingRuleConfig = ConfigurationYamlConverter.loadShardingRuleConfig(configuration);
- return oldShardingRuleConfig.getTables().stream().collect(Collectors.toMap(ShardingTableRuleConfiguration::getLogicTable, Function.identity()));
- }
-
- private static String[] groupByDataSource(final List<String> actualDataNodeList) {
- List<String> result = new ArrayList<>();
- Multimap<String, String> multiMap = getNodeMultiMap(actualDataNodeList);
- for (String key : multiMap.keySet()) {
- List<String> list = new ArrayList<>();
- for (String value : multiMap.get(key)) {
- list.add(String.format("%s.%s", key, value));
- }
- result.add(String.join(",", list));
- }
- return result.toArray(new String[0]);
- }
-
- private static Multimap<String, String> getNodeMultiMap(final List<String> actualDataNodeList) {
- Multimap<String, String> result = HashMultimap.create();
- for (String actualDataNodes : actualDataNodeList) {
- for (String actualDataNode : actualDataNodes.split(",")) {
- String[] nodeArray = split(actualDataNode);
- for (String dataSource : new InlineExpressionParser(nodeArray[0]).splitAndEvaluate()) {
- result.put(dataSource, nodeArray[1]);
- }
- }
- }
- return result;
- }
-
- private static String[] split(final String actualDataNode) {
- boolean flag = true;
- int i = 0;
- for (; i < actualDataNode.length(); i++) {
- char each = actualDataNode.charAt(i);
- if (each == '{') {
- flag = false;
- } else if (each == '}') {
- flag = true;
- } else if (flag && each == '.') {
- break;
- }
- }
- return new String[]{actualDataNode.substring(0, i), actualDataNode.substring(i + 1)};
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
deleted file mode 100644
index 160f686..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/RdbmsConfigurationUtil.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
-
-/**
- * Rdbms configuration utility.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class RdbmsConfigurationUtil {
-
- /**
- * Get SQL where condition whit primary key.
- *
- * @param inventoryDumperConfig rdbms configuration
- * @return SQL where condition
- */
- public static String getWhereCondition(final InventoryDumperConfiguration inventoryDumperConfig) {
- return getWhereCondition(inventoryDumperConfig.getPrimaryKey(), inventoryDumperConfig.getPosition());
- }
-
- private static String getWhereCondition(final String primaryKey, final Position<?> position) {
- if (null == primaryKey || null == position) {
- return "";
- }
- PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) position;
- return String.format("WHERE %s BETWEEN %d AND %d", primaryKey, primaryKeyPosition.getBeginValue(), primaryKeyPosition.getEndValue());
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ShardingColumnsUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ShardingColumnsUtil.java
deleted file mode 100644
index d201c28..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ShardingColumnsUtil.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.utils;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Sharding columns util.
- */
-public final class ShardingColumnsUtil {
-
- /**
- * Is Sharding column.
- *
- * @param shardingColumnsMap sharding columns map
- * @param tableName table name
- * @param columnName column name
- * @return boolean
- */
- public static boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap,
- final String tableName, final String columnName) {
- return shardingColumnsMap.containsKey(tableName)
- && shardingColumnsMap.get(tableName).contains(columnName);
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
index d7e3101..7b17f00 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManagerTest.java
@@ -17,15 +17,14 @@
package org.apache.shardingsphere.scaling.core.datasource;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
+import org.apache.shardingsphere.scaling.core.job.JobContext;
import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -39,9 +38,8 @@ public final class DataSourceManagerTest {
private List<TaskConfiguration> taskConfigurations;
@Before
- @SneakyThrows(IOException.class)
public void setUp() {
- taskConfigurations = JobConfigurationUtil.initJobContext("/config.json").getTaskConfigs();
+ taskConfigurations = new JobContext(JobConfigurationUtil.initJobConfig("/config.json")).getTaskConfigs();
}
@Test
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/ManualBitSetTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/ManualBitSetTest.java
index cf87251..0d59150 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/ManualBitSetTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/channel/bitset/ManualBitSetTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.channel.bitset;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.junit.Test;
import java.util.BitSet;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
index 5838db3..64fddd1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyCheckerTest.java
@@ -25,7 +25,6 @@ import org.apache.shardingsphere.scaling.core.util.JobConfigurationUtil;
import org.junit.Test;
import javax.sql.DataSource;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -59,8 +58,7 @@ public final class AbstractDataConsistencyCheckerTest {
}
}
- @SneakyThrows(IOException.class)
private JobContext mockJobContext() {
- return JobConfigurationUtil.initJobContext("/config.json");
+ return new JobContext(JobConfigurationUtil.initJobConfig("/config.json"));
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
index 5aa8af4..3b3ab77 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryTaskSplitterTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.job.preparer.splitter;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.config.TaskConfiguration;
@@ -33,7 +32,6 @@ import org.junit.Before;
import org.junit.Test;
import javax.sql.DataSource;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -150,9 +148,8 @@ public final class InventoryTaskSplitterTest {
}
}
- @SneakyThrows(IOException.class)
private JobContext mockJobContext() {
- JobContext result = JobConfigurationUtil.initJobContext("/config.json");
+ JobContext result = new JobContext(JobConfigurationUtil.initJobConfig("/config.json"));
result.getJobConfig().getHandleConfig().setDatabaseType("H2");
result.getJobConfig().getHandleConfig().setShardingSize(10);
taskConfig = new TaskConfiguration(result.getJobConfig().getHandleConfig(), mockDumperConfig(), new ImporterConfiguration());
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
index 975719a..6662f61 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JDBCUtilTest.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration
import org.apache.shardingsphere.scaling.core.config.datasource.ConfigurationYamlConverter;
import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.utils.JDBCUtil;
import org.junit.Test;
import java.util.ArrayList;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
deleted file mode 100644
index fcbb624..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/util/JobConfigurationUtil.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.util;
-
-import com.google.gson.Gson;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobContext;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Job configuration util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class JobConfigurationUtil {
-
- /**
- * Init job config.
- *
- * @param configFile config file
- * @return job configuration
- * @throws IOException IO exception
- */
- public static JobConfiguration initJobConfig(final String configFile) throws IOException {
- try (InputStream fileInputStream = JobConfigurationUtil.class.getResourceAsStream(configFile);
- InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream)) {
- return new Gson().fromJson(inputStreamReader, JobConfiguration.class);
- }
- }
-
- /**
- * Init job context by config file.
- *
- * @param configFile config file
- * @return scaling job
- * @throws IOException IO exception
- */
- public static JobContext initJobContext(final String configFile) throws IOException {
- return new JobContext(initJobConfig(configFile));
- }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
index 39f2155..569dacd 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
-import org.apache.shardingsphere.scaling.core.utils.JDBCUtil;
+import org.apache.shardingsphere.scaling.core.util.JDBCUtil;
import java.util.Map;
import java.util.Set;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLJdbcDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLJdbcDumper.java
index dc022c2..dcdb77a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLJdbcDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLJdbcDumper.java
@@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.AbstractJDBCDumper;
-import org.apache.shardingsphere.scaling.core.utils.JDBCUtil;
+import org.apache.shardingsphere.scaling.core.util.JDBCUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
index 4c308b6..6a9abea 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
-import org.apache.shardingsphere.scaling.core.utils.ShardingColumnsUtil;
import java.util.Map;
import java.util.Set;
@@ -54,8 +53,7 @@ public final class MySQLScalingSQLBuilder extends AbstractScalingSQLBuilder impl
StringBuilder result = new StringBuilder(" ON DUPLICATE KEY UPDATE ");
for (int i = 0; i < dataRecord.getColumnCount(); i++) {
Column column = dataRecord.getColumn(i);
- if (column.isPrimaryKey() || ShardingColumnsUtil.isShardingColumn(
- getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
+ if (column.isPrimaryKey() || isShardingColumn(getShardingColumnsMap(), dataRecord.getTableName(), column.getName())) {
continue;
}
result.append(quote(column.getName())).append("=VALUES(").append(quote(column.getName())).append("),");
@@ -64,6 +62,12 @@ public final class MySQLScalingSQLBuilder extends AbstractScalingSQLBuilder impl
return result.toString();
}
+ private boolean isShardingColumn(final Map<String, Set<String>> shardingColumnsMap,
+ final String tableName, final String columnName) {
+ return shardingColumnsMap.containsKey(tableName)
+ && shardingColumnsMap.get(tableName).contains(columnName);
+ }
+
/**
* Build select sum crc32 SQL.
*
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
index e7e8ccf..5334483 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/MySQLClientTest.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLCo
import org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLBinlogEventPacketDecoderTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLBinlogEventPacketDecoderTest.java
index a5b34bf..f3a2c2b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLBinlogEventPacketDecoderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLBinlogEventPacketDecoderTest.java
@@ -21,7 +21,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType;
import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogContext;
import org.apache.shardingsphere.scaling.mysql.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.UpdateRowsEvent;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLCommandPacketDecoderTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLCommandPacketDecoderTest.java
index 645cbce..2a60827 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLCommandPacketDecoderTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLCommandPacketDecoderTest.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLEofPacket
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakePacket;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.mysql.client.InternalResultSet;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLNegotiateHandlerTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLNegotiateHandlerTest.java
index 5b8ff45..16098bb 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLNegotiateHandlerTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/client/netty/MySQLNegotiateHandlerTest.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLAuthPluginData;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakePacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.handshake.MySQLHandshakeResponse41Packet;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.mysql.client.ServerInfo;
import org.junit.Before;
import org.junit.Test;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumperTest.java
index d7d3a2a..1f17e67 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLBinlogDumperTest.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.metadata.JdbcUri;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
import org.apache.shardingsphere.scaling.mysql.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.scaling.mysql.binlog.event.DeleteRowsEvent;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
index 4e713af..ea2d44b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.position.Position;
-import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
index 107f76a..1867bc8 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.MemoryChannel;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
+import org.apache.shardingsphere.scaling.core.util.ReflectionUtil;
import org.apache.shardingsphere.scaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.junit.Before;