You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/17 01:57:37 UTC

[shardingsphere] branch master updated: [DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aacef7  [DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)
1aacef7 is described below

commit 1aacef7ff91d5ebc84481a69a158ca2418352b88
Author: Raigor <ra...@gmail.com>
AuthorDate: Thu Feb 17 09:56:29 2022 +0800

    [DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)
---
 .../syntax/rdl/rule-definition/sharding.cn.md      |  4 +--
 .../syntax/rdl/rule-definition/sharding.en.md      |  4 +--
 .../CreateShardingScalingRuleStatementUpdater.java | 18 +++++++-----
 .../main/antlr4/imports/scaling/RDLStatement.g4    |  4 +--
 .../parser/core/ScalingSQLStatementVisitor.java    | 34 ++++++++++++++++++----
 .../statement/segment/InputOrOutputSegment.java    |  4 +--
 6 files changed, 46 insertions(+), 22 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
index 7ab94b4..03b306f 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
     [inputDefinition] [, outputDefinition] [, streamChannel] [, completionDetector] [, dataConsistencyChecker]
 
 inputDefinition:
-    INPUT (workerThread, batchSize, rateLimiter)
+    INPUT ([workerThread] [, batchSize] [, rateLimiter])
 
 outputDefinition:
-    INPUT (workerThread, batchSize, rateLimiter)
+    OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
 
 completionDetector:
     COMPLETION_DETECTOR (algorithmDefinition)
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
index 7e20f93..e7e586b 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
     [inputDefinition] [, outputDefinition] [, streamChannel] [, completionDetector] [, dataConsistencyChecker]
 
 inputDefinition:
-    INPUT (workerThread, batchSize, rateLimiter)
+    INPUT ([workerThread] [, batchSize] [, rateLimiter])
 
 outputDefinition:
-    INPUT (workerThread, batchSize, rateLimiter)
+    OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
 
 completionDetector:
     COMPLETION_DETECTOR (algorithmDefinition)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
index 20dc278..4f9e0b0 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
@@ -61,7 +61,7 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
         String schemaName = shardingSphereMetaData.getName();
         checkCurrentRuleConfiguration(schemaName, currentRuleConfig);
         checkDuplicate(schemaName, sqlStatement, currentRuleConfig);
-        checkAlgorithms(sqlStatement);
+        checkAlgorithms(sqlStatement.getConfigurationSegment());
     }
     
     private void checkCurrentRuleConfiguration(final String schemaName, final ShardingRuleConfiguration currentRuleConfig) throws RequiredRuleMissedException {
@@ -76,14 +76,14 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
         }
     }
     
-    private void checkAlgorithms(final CreateShardingScalingRuleStatement sqlStatement) throws DistSQLException {
-        if (null == sqlStatement.getConfigurationSegment()) {
+    private void checkAlgorithms(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
+        if (null == segment) {
             return;
         }
-        checkRateLimiterExist(sqlStatement.getConfigurationSegment());
-        checkStreamChannelExist(sqlStatement.getConfigurationSegment());
-        checkCompletionDetectorExist(sqlStatement.getConfigurationSegment());
-        checkDataConsistencyCheckerExist(sqlStatement.getConfigurationSegment());
+        checkRateLimiterExist(segment);
+        checkStreamChannelExist(segment);
+        checkCompletionDetectorExist(segment);
+        checkDataConsistencyCheckerExist(segment);
     }
     
     private void checkRateLimiterExist(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
@@ -96,7 +96,9 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
     }
     
     private void checkRateLimiterAlgorithm(final AlgorithmSegment rateLimiter) throws DistSQLException {
-        checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter", rateLimiter);
+        if (null != rateLimiter) {
+            checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter", rateLimiter);
+        }
     }
     
     private void checkStreamChannelExist(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
index b022a6d..6ed1bf8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
@@ -44,11 +44,11 @@ scalingRuleDefinition
     ;
 
 inputDefinition
-    : INPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+    : INPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
     ;
 
 outputDefinition
-    : OUTPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+    : OUTPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
     ;
 
 completionDetector
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
index 0f804cb..bef5785 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementBaseVisi
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.AlgorithmDefinitionContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.ApplyScalingContext;
+import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.BatchSizeContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CheckScalingContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CompletionDetectorContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CreateShardingScalingRuleContext;
@@ -43,6 +44,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.S
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingSourceWritingContext;
 import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StreamChannelContext;
+import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.WorkerThreadContext;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.scaling.distsql.statement.ApplyScalingStatement;
 import org.apache.shardingsphere.scaling.distsql.statement.CheckScalingStatement;
@@ -159,20 +161,40 @@ public final class ScalingSQLStatementVisitor extends ScalingStatementBaseVisito
     
     @Override
     public ASTNode visitInputDefinition(final InputDefinitionContext ctx) {
-        int workerThread = Integer.parseInt(ctx.workerThread().intValue().getText());
-        int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
-        AlgorithmSegment rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+        Integer workerThread = getWorkerThread(ctx.workerThread());
+        Integer batchSize = getBatchSize(ctx.batchSize());
+        AlgorithmSegment rateLimiter = null;
+        if (null != ctx.rateLimiter()) {
+            rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+        }
         return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
     }
     
     @Override
     public ASTNode visitOutputDefinition(final OutputDefinitionContext ctx) {
-        int workerThread = Integer.parseInt(ctx.workerThread().intValue().getText());
-        int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
-        AlgorithmSegment rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+        Integer workerThread = getWorkerThread(ctx.workerThread());
+        Integer batchSize = getBatchSize(ctx.batchSize());
+        AlgorithmSegment rateLimiter = null;
+        if (null != ctx.rateLimiter()) {
+            rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+        }
         return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
     }
     
+    private Integer getWorkerThread(final WorkerThreadContext ctx) {
+        if (null == ctx) {
+            return null;
+        }
+        return Integer.parseInt(ctx.intValue().getText());
+    }
+    
+    private Integer getBatchSize(final BatchSizeContext ctx) {
+        if (null == ctx) {
+            return null;
+        }
+        return Integer.parseInt(ctx.intValue().getText());
+    }
+    
     @Override
     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
         return visit(ctx.algorithmDefinition());
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
index 35d5c92..67c3d4b 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
@@ -29,9 +29,9 @@ import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
 @Getter
 public final class InputOrOutputSegment implements ASTNode {
 
-    private final int workerThread;
+    private final Integer workerThread;
 
-    private final int batchSize;
+    private final Integer batchSize;
     
     private final AlgorithmSegment rateLimiter;
 }