You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/15 03:30:22 UTC

[rocketmq] branch snode updated: Add request qps and request size per second flow control strategy

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 74a5b3f  Add request qps and request size per second flow control strategy
74a5b3f is described below

commit 74a5b3f5d4e2a1e489e0242e2e6b8cd9cb63f5d5
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 15 11:29:59 2019 +0800

    Add request qps and request size per second flow control strategy
---
 .../flowcontrol/AbstractFlowControlService.java    | 54 ++++++++++++++--------
 .../flowcontrol/QPSFlowControlServiceImpl.java     |  2 +-
 ...java => RequestSizeFlowControlServiceImpl.java} | 41 ++++++++--------
 .../snode/offset/ConsumerOffsetManager.java        |  2 +-
 ...tmq.snode.interceptor.RemotingServerInterceptor |  3 +-
 5 files changed, 57 insertions(+), 45 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
index fc4ef40..a2d8adc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
+++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java
@@ -17,12 +17,13 @@
 package org.apache.rocketmq.common.flowcontrol;
 
 import com.alibaba.csp.sentinel.SphO;
-import com.alibaba.csp.sentinel.slots.block.RuleConstant;
 import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
 import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -36,12 +37,14 @@ public abstract class AbstractFlowControlService implements Interceptor {
     private final ThreadLocal<Boolean> acquiredThreadLocal = new ThreadLocal<Boolean>();
     private final FlowControlConfig flowControlConfig;
 
+    public final String flowControlNameSeparator = "@";
+
     public AbstractFlowControlService() {
         this.flowControlConfig = new FlowControlConfig();
         loadRules(this.flowControlConfig);
     }
 
-    public abstract String getResourceKey(RequestContext requestContext);
+    public abstract String getResourceName(RequestContext requestContext);
 
     public abstract int getResourceCount(RequestContext requestContext);
 
@@ -51,23 +54,24 @@ public abstract class AbstractFlowControlService implements Interceptor {
 
     @Override
     public void beforeRequest(RequestContext requestContext) {
-        String resourceKey = getResourceKey(requestContext);
+        String resourceName = getResourceName(requestContext);
+        String flowControlType = getFlowControlType();
         int resourceCount = getResourceCount(requestContext);
+        String resourceKey = buildResourceName(flowControlType, resourceName);
+        log.info("resourceKey: {} resourceCount: {}", resourceKey, resourceCount);
         resourceCount = resourceCount == 0 ? 1 : resourceCount;
-        if (resourceKey != null) {
-            boolean acquired = SphO.entry(resourceKey, resourceCount);
-            if (acquired) {
-                this.acquiredThreadLocal.set(true);
-            } else {
-                rejectRequest(requestContext);
-            }
+        boolean acquired = SphO.entry(resourceKey, resourceCount);
+        if (acquired) {
+            this.acquiredThreadLocal.set(true);
+        } else {
+            rejectRequest(requestContext);
         }
     }
 
     @Override
     public void afterRequest(ResponseContext responseContext) {
         Boolean acquired = this.acquiredThreadLocal.get();
-        if (acquired != null && acquired == true) {
+        if (acquired != null && acquired) {
             SphO.exit();
         }
     }
@@ -75,7 +79,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
     @Override
     public void onException(ExceptionContext exceptionContext) {
         Boolean acquired = this.acquiredThreadLocal.get();
-        if (acquired != null && acquired == true) {
+        if (acquired != null && acquired) {
             SphO.exit();
         }
     }
@@ -94,27 +98,39 @@ public abstract class AbstractFlowControlService implements Interceptor {
                 log.warn("Get flow control config null by moduleName: {} ", moduleName);
             }
         } else {
-            log.warn("flowControlConfig is null ");
+            log.warn("FlowControlConfig is null ");
         }
         return null;
     }
 
+    private String buildResourceName(String flowControlType, String flowControlResourceName) {
+        StringBuffer sb = new StringBuffer(32);
+        sb.append(flowControlType).append(flowControlNameSeparator).append(flowControlResourceName);
+        return sb.toString();
+    }
+
     private void loadRules(FlowControlConfig flowControlConfig) {
         Map<String, Map<String, List<FlowControlRule>>> rules = flowControlConfig.getPlainFlowControlRules();
-        for (Map<String, List<FlowControlRule>> flowControlTypeMap : rules.values()) {
-            for (List<FlowControlRule> list : flowControlTypeMap.values()) {
+        List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
+        for (Map<String, List<FlowControlRule>> rulesMap : rules.values()) {
+            Set<Map.Entry<String, List<FlowControlRule>>> entrySet = rulesMap.entrySet();
+            Iterator iterator = entrySet.iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, List<FlowControlRule>> entry = (Map.Entry<String, List<FlowControlRule>>) iterator.next();
+                String flowControlType = entry.getKey();
+                List<FlowControlRule> list = entry.getValue();
                 for (FlowControlRule flowControlRule : list) {
-                    List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
                     FlowRule rule1 = new FlowRule();
-                    rule1.setResource(flowControlRule.getFlowControlResourceName());
+                    rule1.setResource(buildResourceName(flowControlType, flowControlRule.getFlowControlResourceName()));
                     rule1.setCount(flowControlRule.getFlowControlResourceCount());
                     rule1.setGrade(flowControlRule.getFlowControlGrade());
                     rule1.setLimitApp("default");
                     sentinelRules.add(rule1);
-                    FlowRuleManager.loadRules(sentinelRules);
                 }
             }
         }
+        FlowRuleManager.loadRules(sentinelRules);
+        log.warn("Load Rules: {}" + FlowRuleManager.getRules());
     }
-
 }
+
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
index 96a1e96..743ac9b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
@@ -38,7 +38,7 @@ public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
     }
 
     @Override
-    public String getResourceKey(RequestContext requestContext) {
+    public String getResourceName(RequestContext requestContext) {
         if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
             return null;
         }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
similarity index 70%
copy from snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
copy to snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
index 96a1e96..9088694 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/RequestSizeFlowControlServiceImpl.java
@@ -16,55 +16,50 @@
  */
 package org.apache.rocketmq.snode.flowcontrol;
 
+import com.sun.org.apache.bcel.internal.generic.IF_ACMPEQ;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
-import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
 import org.apache.rocketmq.remoting.interceptor.RequestContext;
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 
-public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
+public class RequestSizeFlowControlServiceImpl extends AbstractFlowControlService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
     private final AtomicLong logCount = new AtomicLong(0);
+    private final String flowControlType = "sizeLimit";
 
-    private final String flowControlType = "countLimit";
-
-    public QPSFlowControlServiceImpl() {
-        super();
+    @Override
+    public String getResourceName(RequestContext requestContext) {
+        return requestContext.getRequest().getCode() + "";
     }
 
+    /**
+     * @param requestContext
+     * @return Size of request KB
+     */
     @Override
-    public String getResourceKey(RequestContext requestContext) {
-        if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
-            return null;
-        }
-        return requestContext.getRequest().getCode() + "";
+    public int getResourceCount(RequestContext requestContext) {
+        return requestContext.getRequest().getBody().length / 1024;
     }
 
     @Override
     public String getFlowControlType() {
-        return this.flowControlType;
+        return flowControlType;
     }
 
     @Override
     public void rejectRequest(RequestContext requestContext) {
         if (logCount.getAndIncrement() % 100 == 0) {
-            log.warn("[REJECT]exceed system flow control config QPS, start flow control for a while: requestContext: {} ", requestContext);
+            log.warn("[REJECT]exceed system flow control config request size, start flow control for a while: requestContext: {} ", requestContext);
         }
-        throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config QPS, start flow control for a while");
+        throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config request size, start flow control for a while");
     }
 
     @Override
     public String interceptorName() {
-        return "snodeQPSFlowControlInterceptor";
-    }
-
-    @Override
-    public int getResourceCount(RequestContext requestContext) {
-        return 1;
+        return "requestSizeFlowControlInterceptor";
     }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index e295a15..3d53ef1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.exception.SnodeException;
 
 public class ConsumerOffsetManager {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
     private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
index 5b1dd52..9c742a2 100644
--- a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
+++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor
@@ -1 +1,2 @@
-org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
\ No newline at end of file
+org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
+org.apache.rocketmq.snode.flowcontrol.RequestSizeFlowControlServiceImpl
\ No newline at end of file