You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/15 03:30:22 UTC
[rocketmq] branch snode updated: Add request qps and request size
per second flow control strategy
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 74a5b3f Add request qps and request size per second flow control strategy
74a5b3f is described below
commit 74a5b3f5d4e2a1e489e0242e2e6b8cd9cb63f5d5
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 15 11:29:59 2019 +0800
Add request qps and request size per second flow control strategy
---
.../flowcontrol/AbstractFlowControlService.java | 54 ++++++++++++++--------
.../flowcontrol/QPSFlowControlServiceImpl.java | 2 +-
...java => RequestSizeFlowControlServiceImpl.java} | 41 ++++++++--------
.../snode/offset/ConsumerOffsetManager.java | 2 +-
...tmq.snode.interceptor.RemotingServerInterceptor | 3 +-
5 files changed, 57 insertions(+), 45 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
index fc4ef40..a2d8adc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
+++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
@@ -17,12 +17,13 @@
package org.apache.rocketmq.common.flowcontrol;
import com.alibaba.csp.sentinel.SphO;
-import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -36,12 +37,14 @@ public abstract class AbstractFlowControlService implements Interceptor {
private final ThreadLocal<Boolean> acquiredThreadLocal = new ThreadLocal<Boolean>();
private final FlowControlConfig flowControlConfig;
+ public final String flowControlNameSeparator = "@";
+
public AbstractFlowControlService() {
this.flowControlConfig = new FlowControlConfig();
loadRules(this.flowControlConfig);
}
- public abstract String getResourceKey(RequestContext requestContext);
+ public abstract String getResourceName(RequestContext requestContext);
public abstract int getResourceCount(RequestContext requestContext);
@@ -51,23 +54,24 @@ public abstract class AbstractFlowControlService implements Interceptor {
@Override
public void beforeRequest(RequestContext requestContext) {
- String resourceKey = getResourceKey(requestContext);
+ String resourceName = getResourceName(requestContext);
+ String flowControlType = getFlowControlType();
int resourceCount = getResourceCount(requestContext);
+ String resourceKey = buildResourceName(flowControlType, resourceName);
+ log.info("resourceKey: {} resourceCount: {}", resourceKey, resourceCount);
resourceCount = resourceCount == 0 ? 1 : resourceCount;
- if (resourceKey != null) {
- boolean acquired = SphO.entry(resourceKey, resourceCount);
- if (acquired) {
- this.acquiredThreadLocal.set(true);
- } else {
- rejectRequest(requestContext);
- }
+ boolean acquired = SphO.entry(resourceKey, resourceCount);
+ if (acquired) {
+ this.acquiredThreadLocal.set(true);
+ } else {
+ rejectRequest(requestContext);
}
}
@Override
public void afterRequest(ResponseContext responseContext) {
Boolean acquired = this.acquiredThreadLocal.get();
- if (acquired != null && acquired == true) {
+ if (acquired != null && acquired) {
SphO.exit();
}
}
@@ -75,7 +79,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
@Override
public void onException(ExceptionContext exceptionContext) {
Boolean acquired = this.acquiredThreadLocal.get();
- if (acquired != null && acquired == true) {
+ if (acquired != null && acquired) {
SphO.exit();
}
}
@@ -94,27 +98,39 @@ public abstract class AbstractFlowControlService implements Interceptor {
log.warn("Get flow control config null by moduleName: {} ", moduleName);
}
} else {
- log.warn("flowControlConfig is null ");
+ log.warn("FlowControlConfig is null ");
}
return null;
}
+ private String buildResourceName(String flowControlType, String flowControlResourceName) {
+ StringBuffer sb = new StringBuffer(32);
+ sb.append(flowControlType).append(flowControlNameSeparator).append(flowControlResourceName);
+ return sb.toString();
+ }
+
private void loadRules(FlowControlConfig flowControlConfig) {
Map<String, Map<String, List<FlowControlRule>>> rules = flowControlConfig.getPlainFlowControlRules();
- for (Map<String, List<FlowControlRule>> flowControlTypeMap : rules.values()) {
- for (List<FlowControlRule> list : flowControlTypeMap.values()) {
+ List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
+ for (Map<String, List<FlowControlRule>> rulesMap : rules.values()) {
+ Set<Map.Entry<String, List<FlowControlRule>>> entrySet = rulesMap.entrySet();
+ Iterator iterator = entrySet.iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, List<FlowControlRule>> entry = (Map.Entry<String, List<FlowControlRule>>) iterator.next();
+ String flowControlType = entry.getKey();
+ List<FlowControlRule> list = entry.getValue();
for (FlowControlRule flowControlRule : list) {
- List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
- rule1.setResource(flowControlRule.getFlowControlResourceName());
+ rule1.setResource(buildResourceName(flowControlType, flowControlRule.getFlowControlResourceName()));
rule1.setCount(flowControlRule.getFlowControlResourceCount());
rule1.setGrade(flowControlRule.getFlowControlGrade());
rule1.setLimitApp("default");
sentinelRules.add(rule1);
- FlowRuleManager.loadRules(sentinelRules);
}
}
}
+ FlowRuleManager.loadRules(sentinelRules);
+ log.warn("Load Rules: {}" + FlowRuleManager.getRules());
}
-
}
+
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
index 96a1e96..743ac9b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
@@ -38,7 +38,7 @@ public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
}
@Override
- public String getResourceKey(RequestContext requestContext) {
+ public String getResourceName(RequestContext requestContext) {
if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
return null;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
similarity index 70%
copy from snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
copy to snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
index 96a1e96..9088694 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
@@ -16,55 +16,50 @@
*/
package org.apache.rocketmq.snode.flowcontrol;
+import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
-import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
-public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
+public class RequestSizeFlowControlServiceImpl extends AbstractFlowControlService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final AtomicLong logCount = new AtomicLong(0);
+ private final String flowControlType = "sizeLimit";
- private final String flowControlType = "countLimit";
-
- public QPSFlowControlServiceImpl() {
- super();
+ @Override
+ public String getResourceName(RequestContext requestContext) {
+ return requestContext.getRequest().getCode() + "";
}
+ /**
+ * @param requestContext
+ * @return Size of request KB
+ */
@Override
- public String getResourceKey(RequestContext requestContext) {
- if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
- return null;
- }
- return requestContext.getRequest().getCode() + "";
+ public int getResourceCount(RequestContext requestContext) {
+ return requestContext.getRequest().getBody().length / 1024;
}
@Override
public String getFlowControlType() {
- return this.flowControlType;
+ return flowControlType;
}
@Override
public void rejectRequest(RequestContext requestContext) {
if (logCount.getAndIncrement() % 100 == 0) {
- log.warn("[REJECT]exceed system flow control config QPS, start flow control for a while: requestContext: {} ", requestContext);
+ log.warn("[REJECT]exceed system flow control config request size, start flow control for a while: requestContext: {} ", requestContext);
}
- throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config QPS, start flow control for a while");
+ throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config request size, start flow control for a while");
}
@Override
public String interceptorName() {
- return "snodeQPSFlowControlInterceptor";
- }
-
- @Override
- public int getResourceCount(RequestContext requestContext) {
- return 1;
+ return "requestSizeFlowControlInterceptor";
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index e295a15..3d53ef1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.SnodeException;
public class ConsumerOffsetManager {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
index 5b1dd52..9c742a2 100644
--- a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
+++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
@@ -1 +1,2 @@
-org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
\ No newline at end of file
+org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
+org.apache.rocketmq.snode.flowcontrol.RequestSizeFlowControlServiceImpl
\ No newline at end of file