You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/02/26 13:40:00 UTC
[shardingsphere] branch master updated: Optimize reset scaling job
(#9523)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 144f067 Optimize reset scaling job (#9523)
144f067 is described below
commit 144f067f67e3a778672f6ce9a2fba5d812aa6a96
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Feb 26 21:39:24 2021 +0800
Optimize reset scaling job (#9523)
* rename ScalingOperateExecuteException to ScalingJobOperateException
* Optimize ScalingJobNotFoundException
* rename function
* Optimize reset scaling job
Co-authored-by: qiulu3 <Lucas209910>
---
.../shardingsphere/db/protocol/error/CommonErrorCode.java | 4 +++-
...ecuteException.java => ScalingJobOperateException.java} | 4 ++--
.../distsql/ral/impl/ResetScalingJobBackendHandler.java | 8 ++++----
.../proxy/frontend/mysql/MySQLErrPacketFactory.java | 6 +++---
.../shardingsphere/scaling/web/HttpServerHandler.java | 4 ++--
.../scaling/core/api/RegistryRepositoryAPI.java | 7 +++++++
.../apache/shardingsphere/scaling/core/api/ScalingAPI.java | 14 +++++++-------
.../scaling/core/api/impl/RegistryRepositoryAPIImpl.java | 13 +++++++------
.../scaling/core/api/impl/ScalingAPIImpl.java | 5 +++--
.../core/common/exception/ScalingJobNotFoundException.java | 8 +++++++-
.../scaling/core/api/impl/ScalingAPIImplTest.java | 2 +-
11 files changed, 46 insertions(+), 29 deletions(-)
diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
index 0d4fffb..a0c5aa2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
@@ -49,7 +49,9 @@ public enum CommonErrorCode implements SQLErrorCode {
SHARDING_TABLE_RULE_EXIST(11010, "C11010", "Sharding table rules: [%s] already exists."),
- SCALING_OPERATE_FAILED(12001, "C12001", "Scaling Operate Failed: [%s]"),
+ SCALING_JOB_NOT_EXIST(12001, "C12001", "Scaling job %s does not exist."),
+
+ SCALING_OPERATE_FAILED(12009, "C12009", "Scaling Operate Failed: [%s]"),
UNSUPPORTED_COMMAND(19998, "C19998", "Unsupported command: [%s]"),
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
similarity index 90%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
index b961024..d196efe 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
@@ -21,11 +21,11 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
- * Scaling operate execute exception.
+ * Scaling job operate exception.
*/
@RequiredArgsConstructor
@Getter
-public final class ScalingOperateExecuteException extends BackendException {
+public final class ScalingJobOperateException extends BackendException {
private static final long serialVersionUID = 7598088400647370901L;
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
index 687b4d8..0cb1f12 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.impl;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.impl.ResetScalingJobStatement;
-import org.apache.shardingsphere.proxy.backend.exception.ScalingOperateExecuteException;
+import org.apache.shardingsphere.proxy.backend.exception.ScalingJobOperateException;
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
@@ -41,9 +41,9 @@ public final class ResetScalingJobBackendHandler implements TextProtocolBackendH
@Override
public ResponseHeader execute() {
try {
- scalingAPI.resetTargetTable(sqlStatement.getJobId());
- } catch (SQLException ex) {
- throw new ScalingOperateExecuteException(ex.getMessage());
+ scalingAPI.reset(sqlStatement.getJobId());
+ } catch (final SQLException ex) {
+ throw new ScalingJobOperateException(ex.getMessage());
}
return new UpdateResponseHeader(sqlStatement);
}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
index 13b5fe2..9d9aded 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.proxy.backend.exception.ReplicaQueryRuleNotExis
import org.apache.shardingsphere.proxy.backend.exception.ResourceInUsedException;
import org.apache.shardingsphere.proxy.backend.exception.ResourceNotExistedException;
import org.apache.shardingsphere.proxy.backend.exception.RuleNotExistsException;
-import org.apache.shardingsphere.proxy.backend.exception.ScalingOperateExecuteException;
import org.apache.shardingsphere.proxy.backend.exception.ShardingRuleNotExistedException;
import org.apache.shardingsphere.proxy.backend.exception.ShardingTableRuleExistedException;
import org.apache.shardingsphere.proxy.backend.exception.ShardingTableRuleNotExistedException;
@@ -46,6 +45,7 @@ import org.apache.shardingsphere.proxy.backend.text.sctl.ShardingCTLErrorCode;
import org.apache.shardingsphere.proxy.backend.text.sctl.exception.ShardingCTLException;
import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedCommandException;
import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
+import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.sharding.route.engine.exception.NoSuchTableException;
import org.apache.shardingsphere.sharding.route.engine.exception.TableExistsException;
import org.apache.shardingsphere.sql.parser.exception.SQLParsingException;
@@ -143,8 +143,8 @@ public final class MySQLErrPacketFactory {
if (cause instanceof ReplicaQueryRuleCreateExistsException) {
return new MySQLErrPacket(1, CommonErrorCode.REPLICA_QUERY_RULE_EXIST);
}
- if (cause instanceof ScalingOperateExecuteException) {
- return new MySQLErrPacket(1, CommonErrorCode.SCALING_OPERATE_FAILED, cause.getMessage());
+ if (cause instanceof ScalingJobNotFoundException) {
+ return new MySQLErrPacket(1, CommonErrorCode.SCALING_JOB_NOT_EXIST, ((ScalingJobNotFoundException) cause).getJobId());
}
return new MySQLErrPacket(1, CommonErrorCode.UNKNOWN_EXCEPTION, cause.getMessage());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 75904f3..ac0aad9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -37,8 +37,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.api.JobInfo;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.util.ResponseContentUtil;
import java.sql.SQLException;
@@ -125,7 +125,7 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
private void resetJob(final ChannelHandlerContext context, final String requestPath) {
try {
- scalingAPI.resetTargetTable(getJobId(requestPath));
+ scalingAPI.reset(getJobId(requestPath));
response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
} catch (final ScalingJobNotFoundException | SQLException ex) {
response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index 8d6344d..fba1e01 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -45,6 +45,13 @@ public interface RegistryRepositoryAPI {
JobProgress getJobProgress(long jobId, int shardingItem);
/**
+ * Delete job progress.
+ *
+ * @param jobId job id
+ */
+ void deleteJobProgress(long jobId);
+
+ /**
* Delete job.
*
* @param jobId job id
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
index 5507432..de78cf0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
@@ -39,14 +39,14 @@ public interface ScalingAPI {
List<JobInfo> list();
/**
- * Start a scaling job by id.
+ * Start scaling job by id.
*
* @param jobId job id
*/
void start(long jobId);
/**
- * Start a scaling job by config.
+ * Start scaling job by config.
*
* @param jobConfig job config
* @return job id
@@ -54,21 +54,21 @@ public interface ScalingAPI {
Optional<Long> start(JobConfiguration jobConfig);
/**
- * Stop a job.
+ * Stop scaling job.
*
* @param jobId job id
*/
void stop(long jobId);
/**
- * Remove a job.
+ * Remove scaling job.
*
* @param jobId job id
*/
void remove(long jobId);
/**
- * Get progress.
+ * Get job progress.
*
* @param jobId job id
* @return each sharding item progress
@@ -84,12 +84,12 @@ public interface ScalingAPI {
Map<String, DataConsistencyCheckResult> dataConsistencyCheck(long jobId);
/**
- * Reset target table.
+ * Reset scaling job.
*
* @param jobId job id
* @throws SQLException SQL exception
*/
- void resetTargetTable(long jobId) throws SQLException;
+ void reset(long jobId) throws SQLException;
/**
* Get job configuration.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 4852f33..ec86a13 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -72,16 +72,17 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
@Override
public JobProgress getJobProgress(final long jobId, final int shardingItem) {
- String data = null;
- try {
- data = registryRepository.get(getOffsetPath(jobId, shardingItem));
- } catch (final NullPointerException ex) {
- log.info("job {}-{} without break point.", jobId, shardingItem);
- }
+ String data = registryRepository.get(getOffsetPath(jobId, shardingItem));
return Strings.isNullOrEmpty(data) ? null : JobProgress.init(data);
}
@Override
+ public void deleteJobProgress(final long jobId) {
+ log.info("delete job progress {}", jobId);
+ registryRepository.delete(String.format("%s/%d/offset", ScalingConstant.SCALING_ROOT, jobId));
+ }
+
+ @Override
public void deleteJob(final long jobId) {
log.info("delete job {}", jobId);
registryRepository.delete(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobId));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index cd2f64e..e387d4d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -126,8 +126,9 @@ public final class ScalingAPIImpl implements ScalingAPI {
}
@Override
- public void resetTargetTable(final long jobId) throws SQLException {
+ public void reset(final long jobId) throws SQLException {
log.info("Scaling job {} reset target table", jobId);
+ ScalingAPIFactory.getRegistryRepositoryAPI().deleteJobProgress(jobId);
new ScalingEnvironmentManager().resetTargetTable(new JobContext(getJobConfig(jobId)));
}
@@ -144,7 +145,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
try {
return ScalingAPIFactory.getJobConfigurationAPI().getJobConfiguration(String.valueOf(jobId));
} catch (final NullPointerException ex) {
- throw new ScalingJobNotFoundException(String.format("Can not find scaling job %s", jobId));
+ throw new ScalingJobNotFoundException(String.format("Can not find scaling job %s", jobId), jobId);
}
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
index 217e6c0..bc614c4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
@@ -17,14 +17,20 @@
package org.apache.shardingsphere.scaling.core.common.exception;
+import lombok.Getter;
+
/**
* Scaling job not found exception.
*/
+@Getter
public final class ScalingJobNotFoundException extends RuntimeException {
private static final long serialVersionUID = -903289953649758722L;
- public ScalingJobNotFoundException(final String message) {
+ private final long jobId;
+
+ public ScalingJobNotFoundException(final String message, final long jobId) {
super(message);
+ this.jobId = jobId;
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index bf742e1..d932eb2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -129,7 +129,7 @@ public final class ScalingAPIImplTest {
assertTrue(jobId.isPresent());
JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
initTableData(jobConfig.getRuleConfig());
- scalingAPI.resetTargetTable(jobId.get());
+ scalingAPI.reset(jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
}