You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/17 01:57:37 UTC
[shardingsphere] branch master updated: [DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1aacef7 [DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)
1aacef7 is described below
commit 1aacef7ff91d5ebc84481a69a158ca2418352b88
Author: Raigor <ra...@gmail.com>
AuthorDate: Thu Feb 17 09:56:29 2022 +0800
[DistSQL] Make workerThread, batchSize and rateLimiter optional in sharding scaling rule. (#15456)
---
.../syntax/rdl/rule-definition/sharding.cn.md | 4 +--
.../syntax/rdl/rule-definition/sharding.en.md | 4 +--
.../CreateShardingScalingRuleStatementUpdater.java | 18 +++++++-----
.../main/antlr4/imports/scaling/RDLStatement.g4 | 4 +--
.../parser/core/ScalingSQLStatementVisitor.java | 34 ++++++++++++++++++----
.../statement/segment/InputOrOutputSegment.java | 4 +--
6 files changed, 46 insertions(+), 22 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
index 7ab94b4..03b306f 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.cn.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
[inputDefinition] [, outputDefinition] [, streamChannel] [, completionDetector] [, dataConsistencyChecker]
inputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ INPUT ([workerThread] [, batchSize] [, rateLimiter])
outputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
completionDetector:
COMPLETION_DETECTOR (algorithmDefinition)
diff --git a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
index 7e20f93..e7e586b 100644
--- a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
+++ b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/rdl/rule-definition/sharding.en.md
@@ -154,10 +154,10 @@ scalingRuleDefinition:
[inputDefinition] [, outputDefinition] [, streamChannel] [, completionDetector] [, dataConsistencyChecker]
inputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ INPUT ([workerThread] [, batchSize] [, rateLimiter])
outputDefinition:
- INPUT (workerThread, batchSize, rateLimiter)
+ OUTPUT ([workerThread] [, batchSize] [, rateLimiter])
completionDetector:
COMPLETION_DETECTOR (algorithmDefinition)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
index 20dc278..4f9e0b0 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
@@ -61,7 +61,7 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
String schemaName = shardingSphereMetaData.getName();
checkCurrentRuleConfiguration(schemaName, currentRuleConfig);
checkDuplicate(schemaName, sqlStatement, currentRuleConfig);
- checkAlgorithms(sqlStatement);
+ checkAlgorithms(sqlStatement.getConfigurationSegment());
}
private void checkCurrentRuleConfiguration(final String schemaName, final ShardingRuleConfiguration currentRuleConfig) throws RequiredRuleMissedException {
@@ -76,14 +76,14 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
}
}
- private void checkAlgorithms(final CreateShardingScalingRuleStatement sqlStatement) throws DistSQLException {
- if (null == sqlStatement.getConfigurationSegment()) {
+ private void checkAlgorithms(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
+ if (null == segment) {
return;
}
- checkRateLimiterExist(sqlStatement.getConfigurationSegment());
- checkStreamChannelExist(sqlStatement.getConfigurationSegment());
- checkCompletionDetectorExist(sqlStatement.getConfigurationSegment());
- checkDataConsistencyCheckerExist(sqlStatement.getConfigurationSegment());
+ checkRateLimiterExist(segment);
+ checkStreamChannelExist(segment);
+ checkCompletionDetectorExist(segment);
+ checkDataConsistencyCheckerExist(segment);
}
private void checkRateLimiterExist(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
@@ -96,7 +96,9 @@ public final class CreateShardingScalingRuleStatementUpdater implements RuleDefi
}
private void checkRateLimiterAlgorithm(final AlgorithmSegment rateLimiter) throws DistSQLException {
- checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter", rateLimiter);
+ if (null != rateLimiter) {
+ checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter", rateLimiter);
+ }
}
private void checkStreamChannelExist(final ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4 b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
index b022a6d..6ed1bf8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/antlr4/imports/scaling/RDLStatement.g4
@@ -44,11 +44,11 @@ scalingRuleDefinition
;
inputDefinition
- : INPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+ : INPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
;
outputDefinition
- : OUTPUT LP workerThread COMMA batchSize COMMA rateLimiter RP
+ : OUTPUT LP workerThread? (COMMA? batchSize)? (COMMA? rateLimiter)? RP
;
completionDetector
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
index 0f804cb..bef5785 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-parser/src/main/java/org/apache/shardingsphere/scaling/distsql/parser/core/ScalingSQLStatementVisitor.java
@@ -21,6 +21,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementBaseVisi
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.AlgorithmDefinitionContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.ApplyScalingContext;
+import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.BatchSizeContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CheckScalingContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CompletionDetectorContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.CreateShardingScalingRuleContext;
@@ -43,6 +44,7 @@ import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.S
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StopScalingSourceWritingContext;
import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.StreamChannelContext;
+import org.apache.shardingsphere.distsql.parser.autogen.ScalingStatementParser.WorkerThreadContext;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.scaling.distsql.statement.ApplyScalingStatement;
import org.apache.shardingsphere.scaling.distsql.statement.CheckScalingStatement;
@@ -159,20 +161,40 @@ public final class ScalingSQLStatementVisitor extends ScalingStatementBaseVisito
@Override
public ASTNode visitInputDefinition(final InputDefinitionContext ctx) {
- int workerThread = Integer.parseInt(ctx.workerThread().intValue().getText());
- int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
- AlgorithmSegment rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ Integer workerThread = getWorkerThread(ctx.workerThread());
+ Integer batchSize = getBatchSize(ctx.batchSize());
+ AlgorithmSegment rateLimiter = null;
+ if (null != ctx.rateLimiter()) {
+ rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ }
return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
}
@Override
public ASTNode visitOutputDefinition(final OutputDefinitionContext ctx) {
- int workerThread = Integer.parseInt(ctx.workerThread().intValue().getText());
- int batchSize = Integer.parseInt(ctx.batchSize().intValue().getText());
- AlgorithmSegment rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ Integer workerThread = getWorkerThread(ctx.workerThread());
+ Integer batchSize = getBatchSize(ctx.batchSize());
+ AlgorithmSegment rateLimiter = null;
+ if (null != ctx.rateLimiter()) {
+ rateLimiter = (AlgorithmSegment) visit(ctx.rateLimiter());
+ }
return new InputOrOutputSegment(workerThread, batchSize, rateLimiter);
}
+ private Integer getWorkerThread(final WorkerThreadContext ctx) {
+ if (null == ctx) {
+ return null;
+ }
+ return Integer.parseInt(ctx.intValue().getText());
+ }
+
+ private Integer getBatchSize(final BatchSizeContext ctx) {
+ if (null == ctx) {
+ return null;
+ }
+ return Integer.parseInt(ctx.intValue().getText());
+ }
+
@Override
public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
return visit(ctx.algorithmDefinition());
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
index 35d5c92..67c3d4b 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-statement/src/main/java/org/apache/shardingsphere/scaling/distsql/statement/segment/InputOrOutputSegment.java
@@ -29,9 +29,9 @@ import org.apache.shardingsphere.sql.parser.api.visitor.ASTNode;
@Getter
public final class InputOrOutputSegment implements ASTNode {
- private final int workerThread;
+ private final Integer workerThread;
- private final int batchSize;
+ private final Integer batchSize;
private final AlgorithmSegment rateLimiter;
}