You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/02/26 13:40:00 UTC

[shardingsphere] branch master updated: Optimize reset scaling job (#9523)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 144f067  Optimize reset scaling job (#9523)
144f067 is described below

commit 144f067f67e3a778672f6ce9a2fba5d812aa6a96
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Fri Feb 26 21:39:24 2021 +0800

    Optimize reset scaling job (#9523)
    
    * rename ScalingOperateExecuteException to ScalingJobOperateException
    
    * Optimize ScalingJobNotFoundException
    
    * rename function
    
    * Optimize reset scaling job
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../shardingsphere/db/protocol/error/CommonErrorCode.java  |  4 +++-
 ...ecuteException.java => ScalingJobOperateException.java} |  4 ++--
 .../distsql/ral/impl/ResetScalingJobBackendHandler.java    |  8 ++++----
 .../proxy/frontend/mysql/MySQLErrPacketFactory.java        |  6 +++---
 .../shardingsphere/scaling/web/HttpServerHandler.java      |  4 ++--
 .../scaling/core/api/RegistryRepositoryAPI.java            |  7 +++++++
 .../apache/shardingsphere/scaling/core/api/ScalingAPI.java | 14 +++++++-------
 .../scaling/core/api/impl/RegistryRepositoryAPIImpl.java   | 13 +++++++------
 .../scaling/core/api/impl/ScalingAPIImpl.java              |  5 +++--
 .../core/common/exception/ScalingJobNotFoundException.java |  8 +++++++-
 .../scaling/core/api/impl/ScalingAPIImplTest.java          |  2 +-
 11 files changed, 46 insertions(+), 29 deletions(-)

diff --git a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
index 0d4fffb..a0c5aa2 100644
--- a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
+++ b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
@@ -49,7 +49,9 @@ public enum CommonErrorCode implements SQLErrorCode {
     
     SHARDING_TABLE_RULE_EXIST(11010, "C11010", "Sharding table rules: [%s] already exists."),
     
-    SCALING_OPERATE_FAILED(12001, "C12001", "Scaling Operate Failed: [%s]"),
+    SCALING_JOB_NOT_EXIST(12001, "C12001", "Scaling job %s does not exist."),
+    
+    SCALING_OPERATE_FAILED(12009, "C12009", "Scaling Operate Failed: [%s]"),
     
     UNSUPPORTED_COMMAND(19998, "C19998", "Unsupported command: [%s]"),
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
similarity index 90%
rename from shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java
rename to shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
index b961024..d196efe 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingOperateExecuteException.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ScalingJobOperateException.java
@@ -21,11 +21,11 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
 /**
- * Scaling operate execute exception.
+ * Scaling job operate exception.
  */
 @RequiredArgsConstructor
 @Getter
-public final class ScalingOperateExecuteException extends BackendException {
+public final class ScalingJobOperateException extends BackendException {
     
     private static final long serialVersionUID = 7598088400647370901L;
     
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
index 687b4d8..0cb1f12 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/impl/ResetScalingJobBackendHandler.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.proxy.backend.text.distsql.ral.impl;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.distsql.parser.statement.ral.impl.ResetScalingJobStatement;
-import org.apache.shardingsphere.proxy.backend.exception.ScalingOperateExecuteException;
+import org.apache.shardingsphere.proxy.backend.exception.ScalingJobOperateException;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
@@ -41,9 +41,9 @@ public final class ResetScalingJobBackendHandler implements TextProtocolBackendH
     @Override
     public ResponseHeader execute() {
         try {
-            scalingAPI.resetTargetTable(sqlStatement.getJobId());
-        } catch (SQLException ex) {
-            throw new ScalingOperateExecuteException(ex.getMessage());
+            scalingAPI.reset(sqlStatement.getJobId());
+        } catch (final SQLException ex) {
+            throw new ScalingJobOperateException(ex.getMessage());
         }
         return new UpdateResponseHeader(sqlStatement);
     }
diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
index 13b5fe2..9d9aded 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLErrPacketFactory.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.proxy.backend.exception.ReplicaQueryRuleNotExis
 import org.apache.shardingsphere.proxy.backend.exception.ResourceInUsedException;
 import org.apache.shardingsphere.proxy.backend.exception.ResourceNotExistedException;
 import org.apache.shardingsphere.proxy.backend.exception.RuleNotExistsException;
-import org.apache.shardingsphere.proxy.backend.exception.ScalingOperateExecuteException;
 import org.apache.shardingsphere.proxy.backend.exception.ShardingRuleNotExistedException;
 import org.apache.shardingsphere.proxy.backend.exception.ShardingTableRuleExistedException;
 import org.apache.shardingsphere.proxy.backend.exception.ShardingTableRuleNotExistedException;
@@ -46,6 +45,7 @@ import org.apache.shardingsphere.proxy.backend.text.sctl.ShardingCTLErrorCode;
 import org.apache.shardingsphere.proxy.backend.text.sctl.exception.ShardingCTLException;
 import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedCommandException;
 import org.apache.shardingsphere.proxy.frontend.exception.UnsupportedPreparedStatementException;
+import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.sharding.route.engine.exception.NoSuchTableException;
 import org.apache.shardingsphere.sharding.route.engine.exception.TableExistsException;
 import org.apache.shardingsphere.sql.parser.exception.SQLParsingException;
@@ -143,8 +143,8 @@ public final class MySQLErrPacketFactory {
         if (cause instanceof ReplicaQueryRuleCreateExistsException) {
             return new MySQLErrPacket(1, CommonErrorCode.REPLICA_QUERY_RULE_EXIST);
         }
-        if (cause instanceof ScalingOperateExecuteException) {
-            return new MySQLErrPacket(1, CommonErrorCode.SCALING_OPERATE_FAILED, cause.getMessage());
+        if (cause instanceof ScalingJobNotFoundException) {
+            return new MySQLErrPacket(1, CommonErrorCode.SCALING_JOB_NOT_EXIST, ((ScalingJobNotFoundException) cause).getJobId());
         }
         return new MySQLErrPacket(1, CommonErrorCode.UNKNOWN_EXCEPTION, cause.getMessage());
     }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 75904f3..ac0aad9 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -37,8 +37,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.api.JobInfo;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
-import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.common.exception.ScalingJobNotFoundException;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.util.ResponseContentUtil;
 
 import java.sql.SQLException;
@@ -125,7 +125,7 @@ public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHtt
     
     private void resetJob(final ChannelHandlerContext context, final String requestPath) {
         try {
-            scalingAPI.resetTargetTable(getJobId(requestPath));
+            scalingAPI.reset(getJobId(requestPath));
             response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
         } catch (final ScalingJobNotFoundException | SQLException ex) {
             response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
index 8d6344d..fba1e01 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/RegistryRepositoryAPI.java
@@ -45,6 +45,13 @@ public interface RegistryRepositoryAPI {
     JobProgress getJobProgress(long jobId, int shardingItem);
     
     /**
+     * Delete job progress.
+     *
+     * @param jobId job id
+     */
+    void deleteJobProgress(long jobId);
+    
+    /**
      * Delete job.
      *
      * @param jobId job id
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
index 5507432..de78cf0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/ScalingAPI.java
@@ -39,14 +39,14 @@ public interface ScalingAPI {
     List<JobInfo> list();
     
     /**
-     * Start a scaling job by id.
+     * Start scaling job by id.
      *
      * @param jobId job id
      */
     void start(long jobId);
     
     /**
-     * Start a scaling job by config.
+     * Start scaling job by config.
      *
      * @param jobConfig job config
      * @return job id
@@ -54,21 +54,21 @@ public interface ScalingAPI {
     Optional<Long> start(JobConfiguration jobConfig);
     
     /**
-     * Stop a job.
+     * Stop scaling job.
      *
      * @param jobId job id
      */
     void stop(long jobId);
     
     /**
-     * Remove a job.
+     * Remove scaling job.
      *
      * @param jobId job id
      */
     void remove(long jobId);
     
     /**
-     * Get progress.
+     * Get job progress.
      *
      * @param jobId job id
      * @return each sharding item progress
@@ -84,12 +84,12 @@ public interface ScalingAPI {
     Map<String, DataConsistencyCheckResult> dataConsistencyCheck(long jobId);
     
     /**
-     * Reset target table.
+     * Reset scaling job.
      *
      * @param jobId job id
      * @throws SQLException SQL exception
      */
-    void resetTargetTable(long jobId) throws SQLException;
+    void reset(long jobId) throws SQLException;
     
     /**
      * Get job configuration.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
index 4852f33..ec86a13 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/RegistryRepositoryAPIImpl.java
@@ -72,16 +72,17 @@ public final class RegistryRepositoryAPIImpl implements RegistryRepositoryAPI {
     
     @Override
     public JobProgress getJobProgress(final long jobId, final int shardingItem) {
-        String data = null;
-        try {
-            data = registryRepository.get(getOffsetPath(jobId, shardingItem));
-        } catch (final NullPointerException ex) {
-            log.info("job {}-{} without break point.", jobId, shardingItem);
-        }
+        String data = registryRepository.get(getOffsetPath(jobId, shardingItem));
         return Strings.isNullOrEmpty(data) ? null : JobProgress.init(data);
     }
     
     @Override
+    public void deleteJobProgress(final long jobId) {
+        log.info("delete job progress {}", jobId);
+        registryRepository.delete(String.format("%s/%d/offset", ScalingConstant.SCALING_ROOT, jobId));
+    }
+    
+    @Override
     public void deleteJob(final long jobId) {
         log.info("delete job {}", jobId);
         registryRepository.delete(String.format("%s/%d", ScalingConstant.SCALING_ROOT, jobId));
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
index cd2f64e..e387d4d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImpl.java
@@ -126,8 +126,9 @@ public final class ScalingAPIImpl implements ScalingAPI {
     }
     
     @Override
-    public void resetTargetTable(final long jobId) throws SQLException {
+    public void reset(final long jobId) throws SQLException {
         log.info("Scaling job {} reset target table", jobId);
+        ScalingAPIFactory.getRegistryRepositoryAPI().deleteJobProgress(jobId);
         new ScalingEnvironmentManager().resetTargetTable(new JobContext(getJobConfig(jobId)));
     }
     
@@ -144,7 +145,7 @@ public final class ScalingAPIImpl implements ScalingAPI {
         try {
             return ScalingAPIFactory.getJobConfigurationAPI().getJobConfiguration(String.valueOf(jobId));
         } catch (final NullPointerException ex) {
-            throw new ScalingJobNotFoundException(String.format("Can not find scaling job %s", jobId));
+            throw new ScalingJobNotFoundException(String.format("Can not find scaling job %s", jobId), jobId);
         }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
index 217e6c0..bc614c4 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/common/exception/ScalingJobNotFoundException.java
@@ -17,14 +17,20 @@
 
 package org.apache.shardingsphere.scaling.core.common.exception;
 
+import lombok.Getter;
+
 /**
  * Scaling job not found exception.
  */
+@Getter
 public final class ScalingJobNotFoundException extends RuntimeException {
     
     private static final long serialVersionUID = -903289953649758722L;
     
-    public ScalingJobNotFoundException(final String message) {
+    private final long jobId;
+    
+    public ScalingJobNotFoundException(final String message, final long jobId) {
         super(message);
+        this.jobId = jobId;
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
index bf742e1..d932eb2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/api/impl/ScalingAPIImplTest.java
@@ -129,7 +129,7 @@ public final class ScalingAPIImplTest {
         assertTrue(jobId.isPresent());
         JobConfiguration jobConfig = scalingAPI.getJobConfig(jobId.get());
         initTableData(jobConfig.getRuleConfig());
-        scalingAPI.resetTargetTable(jobId.get());
+        scalingAPI.reset(jobId.get());
         Map<String, DataConsistencyCheckResult> checkResultMap = scalingAPI.dataConsistencyCheck(jobId.get());
         assertThat(checkResultMap.get("t_order").getTargetCount(), is(0L));
     }