You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/12/04 05:05:28 UTC
[servicecomb-java-chassis] branch 1.3.x updated: [SCB-2135]merge
flow control function from 2.1.x to 1.3.x (#2100)
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new c0eb2d9 [SCB-2135]merge flow control function from 2.1.x to 1.3.x (#2100)
c0eb2d9 is described below
commit c0eb2d9f79500f87acf75f24d7c5db9b0410c800
Author: bao liu <bi...@qq.com>
AuthorDate: Fri Dec 4 13:05:16 2020 +0800
[SCB-2135]merge flow control function from 2.1.x to 1.3.x (#2100)
---
.../servicecomb/demo/pojo/client/PojoClient.java | 6 +
.../demo/pojo/client/TestFlowControl.java | 82 +++++++
.../src/main/resources/microservice.yaml | 10 +-
.../demo/pojo/server/FlowControlClientSchema.java | 31 +++
.../demo/pojo/server/FlowControlSchema.java | 31 +++
.../src/main/resources/microservice.yaml | 17 +-
.../java/org/apache/servicecomb/qps/Config.java | 37 +++-
.../qps/ConsumerQpsFlowControlHandler.java | 7 +-
.../qps/ProviderQpsFlowControlHandler.java | 16 +-
.../servicecomb/qps/QpsControllerManager.java | 218 +++++++++++++------
.../org/apache/servicecomb/qps/QpsStrategy.java | 25 +++
.../qps/strategy/AbstractQpsStrategy.java | 60 +++++
.../qps/strategy/DefaultStrategyFactory.java | 33 +++
.../FixedWindowStrategy.java} | 37 +---
.../servicecomb/qps/strategy/IStrategyFactory.java | 23 ++
.../qps/strategy/LeakyBucketStrategy.java | 70 ++++++
.../qps/strategy/TokenBucketStrategy.java | 28 +++
...pache.servicecomb.qps.strategy.IStrategyFactory | 18 ++
.../servicecomb/qps/QpsControllerManagerTest.java | 241 ++++++++++++---------
.../org/apache/servicecomb/qps/TestConfig.java | 4 -
.../qps/TestConsumerQpsFlowControlHandler.java | 54 ++---
.../qps/TestProviderQpsFlowControlHandler.java | 50 ++---
.../apache/servicecomb/qps/TestQpsStrategy.java | 56 +++++
23 files changed, 871 insertions(+), 283 deletions(-)
diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
index ccb01ca..e325391 100644
--- a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
+++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java
@@ -108,6 +108,7 @@ public class PojoClient {
TestMgr.setMsg(microserviceName, transport);
LOGGER.info("test {}, transport {}", microserviceName, transport);
+ testFlowControl();
testNull(testFromXml);
testNull(test);
testEmpty(test);
@@ -240,6 +241,11 @@ public class PojoClient {
TestMgr.check("code is ''", test.getTestString(""));
}
+ private static void testFlowControl() throws Exception {
+ TestFlowControl flowControl = BeanUtils.getBean("TestFlowControl");
+ flowControl.testAllTransport();
+ }
+
private static void testNull(Test test) {
TestMgr.check("code is 'null'", test.getTestString(null));
TestMgr.check(null, test.wrapParam(null));
diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java
new file mode 100644
index 0000000..b558d22
--- /dev/null
+++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/TestFlowControl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.demo.pojo.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+
+import org.apache.servicecomb.demo.TestMgr;
+import org.apache.servicecomb.provider.pojo.RpcReference;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.springframework.stereotype.Component;
+
+@Component("TestFlowControl")
+public class TestFlowControl {
+ interface Client {
+ int foo(int num);
+
+ int bar(int num);
+ }
+
+ @RpcReference(microserviceName = "pojo", schemaId = "FlowControlSchema")
+ Client client1;
+
+ @RpcReference(microserviceName = "pojo", schemaId = "FlowControlClientSchema")
+ Client client2;
+
+ public void testAllTransport() throws Exception {
+ // 1.3.2 未统一。 2.1.5 统一了。
+ String serverMsg = "InvocationException: code=429;msg={message=rejected by qps flowcontrol}";
+ String clientMsg = "InvocationException: code=429;msg=CommonExceptionData [message=rejected by qps flowcontrol]";
+
+ testFlowControl((num) -> client1.foo(num), true, serverMsg);
+ testFlowControl((num) -> client1.bar(num), false, serverMsg);
+ testFlowControl((num) -> client2.foo(num), true, clientMsg);
+ testFlowControl((num) -> client2.bar(num), false, clientMsg);
+ }
+
+ private void testFlowControl(Function<Integer, Integer> function, boolean expected, String message)
+ throws InterruptedException {
+ AtomicBoolean failed = new AtomicBoolean(false);
+ CountDownLatch countDownLatch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ new Thread() {
+ public void run() {
+ for (int i = 0; i < 10; i++) {
+ try {
+ int result = function.apply(10);
+ if (result != 10) {
+ TestMgr.failed("", new Exception("not expected"));
+ }
+ } catch (InvocationException e) {
+ TestMgr.check(e.getStatusCode(), 429);
+ TestMgr.check(e.getMessage(), message);
+ failed.set(true);
+ break;
+ }
+ }
+ countDownLatch.countDown();
+ }
+ }.start();
+ }
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ TestMgr.check(expected, failed.get());
+ }
+}
diff --git a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml
index 47b01f5..fd5b8e3 100644
--- a/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml
+++ b/demo/demo-pojo/pojo-client/src/main/resources/microservice.yaml
@@ -38,4 +38,12 @@ servicecomb:
enabled: false
loadbalance:
strategy:
- name: Random
\ No newline at end of file
+ name: Random
+ flowcontrol:
+ Consumer:
+ qps:
+ limit:
+ pojo:
+ FlowControlClientSchema:
+ foo: 3
+ bar: 3000
\ No newline at end of file
diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java
new file mode 100644
index 0000000..d94bcba
--- /dev/null
+++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlClientSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.demo.pojo.server;
+
+import org.apache.servicecomb.provider.pojo.RpcSchema;
+
+@RpcSchema(schemaId = "FlowControlClientSchema")
+public class FlowControlClientSchema {
+ public int foo(int num) {
+ return num;
+ }
+
+ public int bar(int num) {
+ return num;
+ }
+}
diff --git a/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java
new file mode 100644
index 0000000..ba2c293
--- /dev/null
+++ b/demo/demo-pojo/pojo-server/src/main/java/org/apache/servicecomb/demo/pojo/server/FlowControlSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.demo.pojo.server;
+
+import org.apache.servicecomb.provider.pojo.RpcSchema;
+
+@RpcSchema(schemaId = "FlowControlSchema")
+public class FlowControlSchema {
+ public int foo(int num) {
+ return num;
+ }
+
+ public int bar(int num) {
+ return num;
+ }
+}
diff --git a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml
index e705183..a18bb7e 100644
--- a/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml
+++ b/demo/demo-pojo/pojo-server/src/main/resources/microservice.yaml
@@ -27,9 +27,16 @@ servicecomb:
address: 0.0.0.0:8080?protocol=http2
highway:
address: 0.0.0.0:7070
- #executors:
- #default: test
- #Provider:
- #server: test
- #server.wrapParam: test
+ handler:
+ chain:
+ Provider:
+ default: qps-flowcontrol-provider
+ flowcontrol:
+ Provider:
+ qps:
+ limit:
+ ANY:
+ FlowControlSchema:
+ foo: 3
+ bar: 3000
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java
index 2c6d69a..8078428 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/Config.java
@@ -26,6 +26,20 @@ import com.netflix.config.DynamicPropertyFactory;
public final class Config {
private static final Logger LOGGER = LoggerFactory.getLogger(Config.class);
+ public static final String STRATEGY_KEY = "servicecomb.flowcontrol.strategy";
+
+ public static final String ANY_SERVICE = "ANY";
+
+ public static final String CONSUMER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.bucket.";
+
+ public static final String PROVIDER_BUCKET_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.bucket.";
+
+ public static final String PROVIDER_BUCKET_KEY_GLOBAL =
+ "servicecomb.flowcontrol.Provider.qps.global.bucket";
+
+ public static final String CONSUMER_BUCKET_KEY_GLOBAL =
+ "servicecomb.flowcontrol.Consumer.qps.global.bucket";
+
public static final String CONSUMER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Consumer.qps.limit.";
public static final String PROVIDER_LIMIT_KEY_PREFIX = "servicecomb.flowcontrol.Provider.qps.limit.";
@@ -33,35 +47,38 @@ public final class Config {
public static final String PROVIDER_LIMIT_KEY_GLOBAL =
"servicecomb.flowcontrol.Provider.qps.global.limit";
+ public static final String CONSUMER_LIMIT_KEY_GLOBAL =
+ "servicecomb.flowcontrol.Consumer.qps.global.limit";
+
public static final String CONSUMER_ENABLED = "servicecomb.flowcontrol.Consumer.qps.enabled";
public static final String PROVIDER_ENABLED = "servicecomb.flowcontrol.Provider.qps.enabled";
public static Config INSTANCE = new Config();
- private final DynamicBooleanProperty consumerEanbled =
+ private final DynamicBooleanProperty consumerEnabled =
DynamicPropertyFactory.getInstance().getBooleanProperty(CONSUMER_ENABLED, true);
- private final DynamicBooleanProperty providerEanbled =
+ private final DynamicBooleanProperty providerEnabled =
DynamicPropertyFactory.getInstance().getBooleanProperty(PROVIDER_ENABLED, true);
public Config() {
- consumerEanbled.addCallback(() -> {
- boolean newValue = consumerEanbled.get();
- LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEanbled, newValue);
+ consumerEnabled.addCallback(() -> {
+ boolean newValue = consumerEnabled.get();
+ LOGGER.info("{} changed from {} to {}", CONSUMER_ENABLED, consumerEnabled, newValue);
});
- providerEanbled.addCallback(() -> {
- boolean newValue = providerEanbled.get();
- LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEanbled, newValue);
+ providerEnabled.addCallback(() -> {
+ boolean newValue = providerEnabled.get();
+ LOGGER.info("{} changed from {} to {}", PROVIDER_ENABLED, providerEnabled, newValue);
});
}
public boolean isConsumerEnabled() {
- return consumerEanbled.get();
+ return consumerEnabled.get();
}
public boolean isProviderEnabled() {
- return providerEanbled.get();
+ return providerEnabled.get();
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
index bc82c6c..1d81311 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ConsumerQpsFlowControlHandler.java
@@ -28,8 +28,7 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
* Support 3 levels of microservice/schema/operation.
*/
public class ConsumerQpsFlowControlHandler implements Handler {
- static final QpsControllerManager qpsControllerMgr = new QpsControllerManager()
- .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX);
+ private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(false);
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
@@ -38,8 +37,8 @@ public class ConsumerQpsFlowControlHandler implements Handler {
return;
}
- QpsController qpsController = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation);
- if (qpsController.isLimitNewRequest()) {
+ QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(invocation.getMicroserviceName(), invocation);
+ if (qpsStrategy.isLimitNewRequest()) {
// return http status 429
CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol");
asyncResp.consumerFail(
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 67ade94..0abdd47 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -23,12 +23,9 @@ import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
-import org.springframework.util.StringUtils;
public class ProviderQpsFlowControlHandler implements Handler {
- static final QpsControllerManager qpsControllerMgr = new QpsControllerManager()
- .setConfigKeyPrefix(Config.PROVIDER_LIMIT_KEY_PREFIX)
- .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL);
+ private final QpsControllerManager qpsControllerMgr = new QpsControllerManager(true);
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
@@ -46,15 +43,12 @@ public class ProviderQpsFlowControlHandler implements Handler {
}
String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
- QpsController qpsController =
- StringUtils.isEmpty(microserviceName)
- ? qpsControllerMgr.getGlobalQpsController()
- : qpsControllerMgr.getOrCreate(microserviceName, invocation);
- isLimitNewRequest(qpsController, asyncResp);
+ QpsStrategy qpsStrategy = qpsControllerMgr.getOrCreate(microserviceName, invocation);
+ isLimitNewRequest(qpsStrategy, asyncResp);
}
- private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) {
- if (qpsController.isLimitNewRequest()) {
+ private boolean isLimitNewRequest(QpsStrategy qpsStrategy, AsyncResponse asyncResp) {
+ if (qpsStrategy.isLimitNewRequest()) {
CommonExceptionData errorData = new CommonExceptionData("rejected by qps flowcontrol");
asyncResp.producerFail(new InvocationException(QpsConst.TOO_MANY_REQUESTS_STATUS, errorData));
return true;
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
index d637e1f..57d1c96 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsControllerManager.java
@@ -17,11 +17,17 @@
package org.apache.servicecomb.qps;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
+import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException;
+import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
+import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy;
+import org.apache.servicecomb.qps.strategy.IStrategyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,34 +36,82 @@ import com.netflix.config.DynamicProperty;
public class QpsControllerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(QpsControllerManager.class);
+ public static final String SEPARATOR = ".";
+
/**
* Describe the relationship between configuration and qpsController.
*/
- protected final Map<String, QpsController> configQpsControllerMap = new ConcurrentHashMapEx<>();
+ private final Map<String, AbstractQpsStrategy> configQpsControllerMap = new ConcurrentHashMapEx<>();
/**
* Describe the relationship between qualifiedKey(format is "microservice.schema.operation") and qpsController.
*/
- protected final Map<String, QpsController> qualifiedNameControllerMap = new ConcurrentHashMapEx<>();
+ private final Map<String, AbstractQpsStrategy> qualifiedNameControllerMap = new ConcurrentHashMapEx<>();
- protected QpsController globalQpsController;
+ private AbstractQpsStrategy globalQpsStrategy;
- public static final String SEPARATOR = ".";
+ private final String limitKeyPrefix;
+
+ private final String bucketKeyPrefix;
+
+ private final String globalLimitKey;
- private String configKeyPrefix;
+ private final String globalBucketKey;
- public QpsController getOrCreate(String microserviceName, Invocation invocation) {
+ public QpsControllerManager(boolean isProvider) {
+ if (isProvider) {
+ limitKeyPrefix = Config.PROVIDER_LIMIT_KEY_PREFIX;
+ bucketKeyPrefix = Config.PROVIDER_BUCKET_KEY_PREFIX;
+ globalLimitKey = Config.PROVIDER_LIMIT_KEY_GLOBAL;
+ globalBucketKey = Config.PROVIDER_BUCKET_KEY_GLOBAL;
+ } else {
+ limitKeyPrefix = Config.CONSUMER_LIMIT_KEY_PREFIX;
+ bucketKeyPrefix = Config.CONSUMER_BUCKET_KEY_PREFIX;
+ globalLimitKey = Config.CONSUMER_LIMIT_KEY_GLOBAL;
+ globalBucketKey = Config.CONSUMER_BUCKET_KEY_GLOBAL;
+ }
+
+ initGlobalQpsController();
+ }
+
+ public QpsStrategy getOrCreate(String microserviceName, Invocation invocation) {
+ final String name = validatedName(microserviceName);
return qualifiedNameControllerMap
- .computeIfAbsent(microserviceName + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(), key -> {
- return create(key, microserviceName, invocation);
- });
+ .computeIfAbsent(
+ name + SEPARATOR + invocation.getOperationMeta().getSchemaQualifiedName(),
+ key -> create(key, name, invocation));
+ }
+
+ private String validatedName(String microserviceName) {
+ String name = microserviceName;
+ if (StringUtils.isEmpty(microserviceName)) {
+ name = Config.ANY_SERVICE;
+ }
+ return name;
}
/**
* Create relevant qpsLimit dynamicProperty and watch the configuration change.
* Search and return a valid qpsController.
*/
- protected QpsController create(String qualifiedNameKey, String microserviceName, Invocation invocation) {
+ private AbstractQpsStrategy create(String qualifiedNameKey, String microserviceName,
+ Invocation invocation) {
+ createForService(qualifiedNameKey, microserviceName, invocation);
+ String qualifiedAnyServiceName = Config.ANY_SERVICE + qualifiedNameKey.substring(microserviceName.length());
+ createForService(qualifiedAnyServiceName, Config.ANY_SERVICE, invocation);
+
+ AbstractQpsStrategy strategy = searchQpsController(qualifiedNameKey);
+ if (strategy == null) {
+ strategy = searchQpsController(qualifiedAnyServiceName);
+ }
+ if (strategy == null) {
+ return globalQpsStrategy;
+ }
+ return strategy;
+ }
+
+ private void createForService(String qualifiedNameKey, String microserviceName,
+ Invocation invocation) {
// create "microservice"
createQpsControllerIfNotExist(microserviceName);
// create "microservice.schema"
@@ -65,12 +119,10 @@ public class QpsControllerManager {
qualifiedNameKey.substring(0, microserviceName.length() + invocation.getSchemaId().length() + 1));
// create "microservice.schema.operation"
createQpsControllerIfNotExist(qualifiedNameKey);
-
- return searchQpsController(qualifiedNameKey);
}
/**
- * <p> Use qualifiedNameKey to search {@link QpsController}.
+ * <p> Use qualifiedNameKey to search {@link QpsStrategy}.
* Firstly try to search "microservice.schema.operation". If no valid result found, then try "microservice.schema",
* and then "microservice" or global qpsController(If there is a global qpsController).</p>
* <p> This method ensures that there is always an existing qpsController returned, as the relevant qpsController has
@@ -79,96 +131,130 @@ public class QpsControllerManager {
* @param qualifiedNameKey qualifiedNameKey in {@link #qualifiedNameControllerMap}
* @return a qps controller, lower level controllers with valid qpsLimit have priority.
*/
- protected QpsController searchQpsController(String qualifiedNameKey) {
- QpsController qpsController = configQpsControllerMap.get(qualifiedNameKey);
- if (isValidQpsController(qpsController)) {
- return qpsController;
+ private AbstractQpsStrategy searchQpsController(String qualifiedNameKey) {
+ AbstractQpsStrategy qpsStrategy = configQpsControllerMap.get(qualifiedNameKey);
+ if (isValidQpsController(qpsStrategy)) {
+ return qpsStrategy;
}
int index = qualifiedNameKey.lastIndexOf(SEPARATOR);
while (index > 0) {
- qpsController = configQpsControllerMap.get(qualifiedNameKey.substring(0, index));
- if (isValidQpsController(qpsController)) {
- return qpsController;
+ qpsStrategy = configQpsControllerMap.get(qualifiedNameKey.substring(0, index));
+ if (isValidQpsController(qpsStrategy)) {
+ return qpsStrategy;
}
index = qualifiedNameKey.lastIndexOf(SEPARATOR, index - 1);
}
- if (isValidQpsController(qpsController)) {
- return qpsController;
- }
-
- if (null != globalQpsController) {
- return globalQpsController;
+ if (isValidQpsController(qpsStrategy)) {
+ return qpsStrategy;
}
- // if null is returned, maybe the operation qps controller is not initiated correctly.
- // getOrCreateQpsController() should be invoked before.
- return qpsController;
+ return null;
}
- private boolean keyMatch(String configKey, Entry<String, QpsController> controllerEntry) {
+ private boolean keyMatch(String configKey, Entry<String, AbstractQpsStrategy> controllerEntry) {
return controllerEntry.getKey().equals(configKey)
|| controllerEntry.getKey().startsWith(configKey + SEPARATOR);
}
- private boolean isValidQpsController(QpsController qpsController) {
- return null != qpsController && null != qpsController.getQpsLimit();
+ private boolean isValidQpsController(AbstractQpsStrategy qpsStrategy) {
+ return null != qpsStrategy && null != qpsStrategy.getQpsLimit();
}
private void createQpsControllerIfNotExist(String configKey) {
- if (configQpsControllerMap.keySet().contains(configKey)) {
+ if (configQpsControllerMap.containsKey(configKey)) {
return;
}
LOGGER.info("Create qpsController, configKey = [{}]", configKey);
- DynamicProperty property = getDynamicProperty(configKey);
- QpsController qpsController = new QpsController(configKey, property.getInteger());
-
- configQpsControllerMap.put(configKey, qpsController);
-
- property.addCallback(() -> {
- qpsController.setQpsLimit(property.getInteger());
- LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey, property.getString());
+ DynamicProperty limitProperty = DynamicProperty.getInstance(limitKeyPrefix + configKey);
+ DynamicProperty bucketProperty = DynamicProperty.getInstance(bucketKeyPrefix + configKey);
+ DynamicProperty strategyProperty = DynamicProperty.getInstance(Config.STRATEGY_KEY);
+ AbstractQpsStrategy qpsStrategy = chooseStrategy(configKey, limitProperty.getLong(),
+ bucketProperty.getLong(), strategyProperty.getString());
+
+ strategyProperty.addCallback(() -> {
+ AbstractQpsStrategy innerQpsStrategy = chooseStrategy(configKey, limitProperty.getLong(),
+ bucketProperty.getLong(), strategyProperty.getString());
+ configQpsControllerMap.put(configKey, innerQpsStrategy);
+ LOGGER.info("Global flow control strategy update, value = [{}]",
+ strategyProperty.getString());
+ updateObjMap(configKey);
+ });
+ limitProperty.addCallback(() -> {
+ qpsStrategy.setQpsLimit(limitProperty.getLong());
+ LOGGER.info("Qps limit updated, configKey = [{}], value = [{}]", configKey,
+ limitProperty.getString());
+ updateObjMap(configKey);
+ });
+ bucketProperty.addCallback(() -> {
+ qpsStrategy.setBucketLimit(bucketProperty.getLong());
+ LOGGER.info("bucket limit updated, configKey = [{}], value = [{}]", configKey,
+ bucketProperty.getString());
updateObjMap(configKey);
});
+
+ configQpsControllerMap.put(configKey, qpsStrategy);
}
protected void updateObjMap(String configKey) {
- for (Entry<String, QpsController> controllerEntry : qualifiedNameControllerMap.entrySet()) {
+ for (Entry<String, AbstractQpsStrategy> controllerEntry : qualifiedNameControllerMap
+ .entrySet()) {
if (keyMatch(configKey, controllerEntry)) {
- QpsController qpsController = searchQpsController(controllerEntry.getKey());
- controllerEntry.setValue(qpsController);
+ AbstractQpsStrategy qpsStrategy = searchQpsController(controllerEntry.getKey());
+ controllerEntry.setValue(qpsStrategy);
LOGGER.info("QpsController updated, operationId = [{}], configKey = [{}], qpsLimit = [{}]",
- controllerEntry.getKey(), qpsController.getKey(), qpsController.getQpsLimit());
+ controllerEntry.getKey(), qpsStrategy.getKey(), qpsStrategy.getQpsLimit());
}
}
}
- public QpsControllerManager setConfigKeyPrefix(String configKeyPrefix) {
- this.configKeyPrefix = configKeyPrefix;
- return this;
- }
-
- public QpsControllerManager setGlobalQpsController(String globalConfigKey) {
- DynamicProperty globalQpsProperty = DynamicProperty.getInstance(globalConfigKey);
- QpsController qpsController = new QpsController(globalConfigKey, globalQpsProperty.getInteger());
-
- globalQpsProperty.addCallback(() -> {
- qpsController.setQpsLimit(globalQpsProperty.getInteger());
- LOGGER.info("Global qps limit update, value = [{}]", globalQpsProperty.getInteger());
+ private void initGlobalQpsController() {
+ DynamicProperty globalLimitProperty = DynamicProperty.getInstance(globalLimitKey);
+ DynamicProperty globalBucketProperty = DynamicProperty.getInstance(globalBucketKey);
+ DynamicProperty globalStrategyProperty = DynamicProperty
+ .getInstance(Config.STRATEGY_KEY);
+ globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE),
+ globalBucketProperty.getLong(), globalStrategyProperty.getString());
+ globalStrategyProperty.addCallback(() -> {
+ globalQpsStrategy = chooseStrategy(globalLimitKey, globalLimitProperty.getLong((long) Integer.MAX_VALUE),
+ globalBucketProperty.getLong(), globalStrategyProperty.getString());
+ LOGGER.info("Global flow control strategy update, value = [{}]",
+ globalStrategyProperty.getString());
+ });
+ globalLimitProperty.addCallback(() -> {
+ globalQpsStrategy.setQpsLimit(globalLimitProperty.getLong((long) Integer.MAX_VALUE));
+ LOGGER.info("Global qps limit update, value = [{}]", globalLimitProperty.getLong());
+ });
+ globalBucketProperty.addCallback(() -> {
+ globalQpsStrategy.setBucketLimit(globalBucketProperty.getLong());
+ LOGGER.info("Global bucket limit update, value = [{}]", globalBucketProperty.getLong());
});
-
- this.globalQpsController = qpsController;
- return this;
- }
-
- public QpsController getGlobalQpsController() {
- return globalQpsController;
}
- protected DynamicProperty getDynamicProperty(String configKey) {
- return DynamicProperty.getInstance(configKeyPrefix + configKey);
+ private AbstractQpsStrategy chooseStrategy(String configKey, Long limit, Long bucket,
+ String strategyName) {
+ if (StringUtils.isEmpty(strategyName)) {
+ strategyName = "FixedWindow";
+ }
+ AbstractQpsStrategy strategy = null;
+ List<IStrategyFactory> strategyFactories = SPIServiceUtils
+ .getOrLoadSortedService(IStrategyFactory.class);
+ for (IStrategyFactory strategyFactory : strategyFactories) {
+ strategy = strategyFactory.createStrategy(strategyName);
+ if (strategy != null) {
+ break;
+ }
+ }
+ if (strategy == null) {
+ throw new ServiceCombException(
+ "the qps strategy name " + strategyName + " is not exist , please check.");
+ }
+ strategy.setKey(configKey);
+ strategy.setQpsLimit(limit);
+ strategy.setBucketLimit(bucket);
+ return strategy;
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java
new file mode 100644
index 0000000..8a712e3
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps;
+
+public interface QpsStrategy {
+
+ boolean isLimitNewRequest();
+
+ String name();
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java
new file mode 100644
index 0000000..65d36aa
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/AbstractQpsStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps.strategy;
+
+import org.apache.servicecomb.qps.QpsStrategy;
+
+
+public abstract class AbstractQpsStrategy implements QpsStrategy {
+
+ private Long qpsLimit;
+
+ private Long bucketLimit;
+
+ private String key;
+
+ public Long getBucketLimit() {
+ return bucketLimit;
+ }
+
+ public void setBucketLimit(Long bucketLimit) {
+ this.bucketLimit = bucketLimit;
+ }
+
+ @Override
+ public abstract boolean isLimitNewRequest();
+
+ @Override
+ public abstract String name();
+
+ public void setQpsLimit(Long qpsLimit) {
+ this.qpsLimit = qpsLimit;
+ }
+
+ public Long getQpsLimit() {
+ return qpsLimit;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java
new file mode 100644
index 0000000..79037f7
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/DefaultStrategyFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.qps.strategy;
+
+public class DefaultStrategyFactory implements IStrategyFactory {
+
+ public AbstractQpsStrategy createStrategy(String strategyName) {
+ switch (strategyName) {
+ case "TokenBucket":
+ return new TokenBucketStrategy();
+ case "LeakyBucket":
+ return new LeakyBucketStrategy();
+ case "FixedWindow":
+ return new FixedWindowStrategy();
+ default:
+ return null;
+ }
+ }
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
similarity index 75%
rename from handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
rename to handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
index 5f294ba..f91f87f 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/FixedWindowStrategy.java
@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.servicecomb.qps;
+package org.apache.servicecomb.qps.strategy;
import java.util.concurrent.atomic.AtomicLong;
-public class QpsController {
- private String key;
-
- private Integer qpsLimit;
+public class FixedWindowStrategy extends AbstractQpsStrategy {
// Interval begin time
private volatile long msCycleBegin;
@@ -35,26 +31,13 @@ public class QpsController {
private static final int CYCLE_LENGTH = 1000;
- public QpsController(String key, Integer qpsLimit) {
- this.key = key;
- this.qpsLimit = qpsLimit;
- this.msCycleBegin = System.currentTimeMillis();
- }
-
- public String getKey() {
- return key;
- }
-
- public Integer getQpsLimit() {
- return qpsLimit;
- }
-
- public void setQpsLimit(Integer qpsLimit) {
- this.qpsLimit = qpsLimit;
- }
+ private static final String STRATEGY_NAME = "FixedWindow";
// return true means new request need to be rejected
public boolean isLimitNewRequest() {
+ if (this.getQpsLimit() == null) {
+ throw new IllegalStateException("should not happen");
+ }
long newCount = requestCount.incrementAndGet();
long msNow = System.currentTimeMillis();
//Time jump cause the new request injected
@@ -66,7 +49,11 @@ public class QpsController {
// Configuration update and use is at the situation of multi-threaded concurrency
// It is possible that operation level updated to null,but schema level or microservice level does not updated
- int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit;
- return newCount - lastRequestCount >= limitValue;
+ return newCount - lastRequestCount >= this.getQpsLimit();
+ }
+
+ @Override
+ public String name() {
+ return STRATEGY_NAME;
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java
new file mode 100644
index 0000000..bebe8ce
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/IStrategyFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps.strategy;
+
+public interface IStrategyFactory {
+
+ AbstractQpsStrategy createStrategy(String strategyName);
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
new file mode 100644
index 0000000..d65d43b
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/LeakyBucketStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps.strategy;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * leaky bucket algorithm include 2 implementation :
+ * 1. as a meter : it's same as the token bucket.
+ * 2. as a queue : the bucket size equal to qpsLimit.
+ *
+ **/
+public class LeakyBucketStrategy extends AbstractQpsStrategy {
+
+ // Request count between Interval begin and now in one interval
+ private volatile AtomicLong requestCount = new AtomicLong();
+
+ private volatile long lastTime;
+
+ private long remainder = 0;
+
+ private static final String STRATEGY_NAME = "LeakyBucket";
+
+ @Override
+ public boolean isLimitNewRequest() {
+ if (this.getQpsLimit() == null) {
+ throw new IllegalStateException("should not happen");
+ }
+ if (this.getBucketLimit() == null) {
+ this.setBucketLimit(Math.max(2 * this.getQpsLimit(), Integer.MAX_VALUE));
+ }
+ long nowTime = System.currentTimeMillis();
+ //get the num of te period time
+ long leakCount = ((nowTime - lastTime + remainder) / 1000) * this.getQpsLimit();
+ remainder = (nowTime - lastTime + remainder) % 1000;
+ // leak the request
+ if (requestCount.longValue() > leakCount) {
+ requestCount.addAndGet(-leakCount);
+ } else {
+ requestCount.set(0);
+ }
+ lastTime = nowTime;
+ //compute this time
+ if (requestCount.longValue() < this.getBucketLimit()) {
+ requestCount.incrementAndGet();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String name() {
+ return STRATEGY_NAME;
+ }
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java
new file mode 100644
index 0000000..082906f
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/strategy/TokenBucketStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps.strategy;
+
+public class TokenBucketStrategy extends LeakyBucketStrategy {
+
+ private static final String STRATEGY_NAME = "TokenBucket";
+
+ @Override
+ public String name() {
+ return STRATEGY_NAME;
+ }
+}
diff --git a/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory b/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory
new file mode 100644
index 0000000..32f53fa
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/main/resources/META-INF/services/org.apache.servicecomb.qps.strategy.IStrategyFactory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.servicecomb.qps.strategy.DefaultStrategyFactory
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java
index 44b86b2..990b44d 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/QpsControllerManagerTest.java
@@ -17,19 +17,17 @@
package org.apache.servicecomb.qps;
-import java.util.Map;
-
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
+import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Mocked;
@@ -57,33 +55,33 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- QpsControllerManager testQpsControllerManager = new QpsControllerManager()
- .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX);
- initTestQpsControllerManager(testQpsControllerManager, invocation, operationMeta);
+ QpsControllerManager testQpsControllerManager = new QpsControllerManager(false);
+ initTestQpsControllerManager(false, testQpsControllerManager, invocation, operationMeta);
// pojo
- setConfigWithDefaultPrefix("pojo", 100);
- QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo", qpsController.getKey());
- Assert.assertTrue(100 == qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation);
- Assert.assertEquals("pojo2", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals("poj", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ setConfigWithDefaultPrefix(false, "pojo", 100);
+ QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation);
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
+
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server", 10000);
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals("poj.server", qpsController.getKey());
- Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 10000);
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ Assert.assertEquals("poj.server", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 10000L);
ArchaiusUtils.setProperty("servicecomb.flowcontrol.Consumer.qps.limit.poj.server.test", 20000);
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals("poj.server.test", qpsController.getKey());
- Assert.assertEquals(qpsController.getQpsLimit(), (Integer) 20000);
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ Assert.assertEquals("poj.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(((AbstractQpsStrategy) qpsStrategy).getQpsLimit(), (Long) 20000L);
- testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta);
+ testGetOrCreateCommon(false, testQpsControllerManager, invocation, operationMeta);
}
@Test
@@ -99,35 +97,33 @@ public class QpsControllerManagerTest {
}
};
- QpsControllerManager testQpsControllerManager = new QpsControllerManager()
- .setGlobalQpsController(Config.PROVIDER_LIMIT_KEY_GLOBAL)
- .setConfigKeyPrefix(Config.CONSUMER_LIMIT_KEY_PREFIX);
+ QpsControllerManager testQpsControllerManager = new QpsControllerManager(true);
// global
setConfig(Config.PROVIDER_LIMIT_KEY_GLOBAL, 50);
- QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey());
- Assert.assertTrue(50 == qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation);
- Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey());
- Assert.assertTrue(50 == qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey());
- Assert.assertTrue(50 == qpsController.getQpsLimit());
+ QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation);
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
// pojo
- setConfigWithDefaultPrefix("pojo", 100);
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo", qpsController.getKey());
- Assert.assertTrue(100 == qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation);
- Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey());
- Assert.assertTrue(50 == qpsController.getQpsLimit());
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, qpsController.getKey());
- Assert.assertTrue(50 == qpsController.getQpsLimit());
-
- testGetOrCreateCommon(testQpsControllerManager, invocation, operationMeta);
+ setConfigWithDefaultPrefix(true, "pojo", 100);
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation);
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(50 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
+
+ testGetOrCreateCommon(true, testQpsControllerManager, invocation, operationMeta);
}
@Test
@@ -142,10 +138,11 @@ public class QpsControllerManagerTest {
result = "schema.opr";
}
};
- QpsControllerManager qpsControllerManager = new QpsControllerManager();
- QpsController qpsController = qpsControllerManager.getOrCreate("service", invocation);
- Assert.assertEquals("service", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ QpsControllerManager qpsControllerManager = new QpsControllerManager(true);
+ QpsStrategy qpsStrategy = qpsControllerManager.getOrCreate("service", invocation);
+ Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit",
+ ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
new Expectations() {
{
@@ -157,9 +154,10 @@ public class QpsControllerManagerTest {
result = "test_schema.test_opr";
}
};
- qpsController = qpsControllerManager.getOrCreate("test_service", invocation);
- Assert.assertEquals("test_service", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = qpsControllerManager.getOrCreate("test_service", invocation);
+ Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit",
+ ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
new Expectations() {
{
@@ -171,9 +169,10 @@ public class QpsControllerManagerTest {
result = "test-schema.test-opr";
}
};
- qpsController = qpsControllerManager.getOrCreate("test-service", invocation);
- Assert.assertEquals("test-service", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = qpsControllerManager.getOrCreate("test-service", invocation);
+ Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit",
+ ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
new Expectations() {
{
@@ -185,9 +184,10 @@ public class QpsControllerManagerTest {
result = "schema.opr.tail";
}
};
- qpsController = qpsControllerManager.getOrCreate("svc", invocation);
- Assert.assertEquals("svc", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation);
+ Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit",
+ ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
new Expectations() {
{
@@ -199,12 +199,14 @@ public class QpsControllerManagerTest {
result = "schema.opr2.tail";
}
};
- qpsController = qpsControllerManager.getOrCreate("svc", invocation);
- Assert.assertEquals("svc", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = qpsControllerManager.getOrCreate("svc", invocation);
+ Assert.assertEquals("servicecomb.flowcontrol.Provider.qps.global.limit",
+ ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
}
- private void testGetOrCreateCommon(QpsControllerManager testQpsControllerManager, Invocation invocation,
+ private void testGetOrCreateCommon(boolean isProvider, QpsControllerManager testQpsControllerManager,
+ Invocation invocation,
OperationMeta operationMeta) {
new Expectations() {
{
@@ -214,10 +216,10 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- setConfigWithDefaultPrefix("pojo.server", 200);
- QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo.server", qpsController.getKey());
- Assert.assertTrue(200 == qpsController.getQpsLimit());
+ setConfigWithDefaultPrefix(isProvider, "pojo.server", 200);
+ QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
new Expectations() {
{
invocation.getOperationMeta();
@@ -226,9 +228,9 @@ public class QpsControllerManagerTest {
result = "server2.test";
}
};
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo", qpsController.getKey());
- Assert.assertTrue(100 == qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
new Expectations() {
{
invocation.getOperationMeta();
@@ -237,9 +239,9 @@ public class QpsControllerManagerTest {
result = "serve.test";
}
};
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo", qpsController.getKey());
- Assert.assertTrue(100 == qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(100 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
// pojo.server.test
new Expectations() {
@@ -250,10 +252,10 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- setConfigWithDefaultPrefix("pojo.server.test", 300);
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo.server.test", qpsController.getKey());
- Assert.assertTrue(300 == qpsController.getQpsLimit());
+ setConfigWithDefaultPrefix(isProvider, "pojo.server.test", 300);
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo.server.test", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(300 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
new Expectations() {
{
invocation.getOperationMeta();
@@ -262,9 +264,9 @@ public class QpsControllerManagerTest {
result = "server.test2";
}
};
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo.server", qpsController.getKey());
- Assert.assertTrue(200 == qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
new Expectations() {
{
invocation.getOperationMeta();
@@ -274,15 +276,16 @@ public class QpsControllerManagerTest {
result = "server.tes";
}
};
- qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo.server", qpsController.getKey());
- Assert.assertTrue(200 == qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ Assert.assertEquals("pojo.server", ((AbstractQpsStrategy) qpsStrategy).getKey());
+ Assert.assertTrue(200 == ((AbstractQpsStrategy) qpsStrategy).getQpsLimit());
}
/**
* Init testQpsControllerManager to test search function.
*/
- private void initTestQpsControllerManager(QpsControllerManager testQpsControllerManager, Invocation invocation,
+ private void initTestQpsControllerManager(boolean isProvider, QpsControllerManager testQpsControllerManager,
+ Invocation invocation,
OperationMeta operationMeta) {
// pojo.server.test
new Expectations() {
@@ -295,9 +298,13 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- QpsController qpsController = testQpsControllerManager.getOrCreate("pojo", invocation);
- Assert.assertEquals("pojo", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ QpsStrategy qpsStrategy = testQpsControllerManager.getOrCreate("pojo", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// pojo.server.test2
new Expectations() {
@@ -311,6 +318,12 @@ public class QpsControllerManagerTest {
}
};
testQpsControllerManager.getOrCreate("pojo", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// pojo.server.tes
new Expectations() {
@@ -324,6 +337,12 @@ public class QpsControllerManagerTest {
}
};
testQpsControllerManager.getOrCreate("pojo", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// pojo.server2.test
new Expectations() {
@@ -337,6 +356,12 @@ public class QpsControllerManagerTest {
}
};
testQpsControllerManager.getOrCreate("pojo", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// pojo.serve.test
new Expectations() {
@@ -350,6 +375,12 @@ public class QpsControllerManagerTest {
}
};
testQpsControllerManager.getOrCreate("pojo", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// pojo2.server.test
new Expectations() {
@@ -362,9 +393,13 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- qpsController = testQpsControllerManager.getOrCreate("pojo2", invocation);
- Assert.assertEquals("pojo2", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("pojo2", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
// poj.server.test
new Expectations() {
@@ -377,9 +412,13 @@ public class QpsControllerManagerTest {
result = "server.test";
}
};
- qpsController = testQpsControllerManager.getOrCreate("poj", invocation);
- Assert.assertEquals("poj", qpsController.getKey());
- Assert.assertNull(qpsController.getQpsLimit());
+ qpsStrategy = testQpsControllerManager.getOrCreate("poj", invocation);
+ if (isProvider) {
+ Assert.assertEquals(Config.PROVIDER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ } else {
+ Assert.assertEquals(Config.CONSUMER_LIMIT_KEY_GLOBAL, ((AbstractQpsStrategy) qpsStrategy).getKey());
+ }
+ Assert.assertEquals(Integer.MAX_VALUE, ((AbstractQpsStrategy) qpsStrategy).getQpsLimit().intValue());
}
@Test
@@ -425,16 +464,12 @@ public class QpsControllerManagerTest {
ArchaiusUtils.setProperty(key, value);
}
- public static void setConfigWithDefaultPrefix(String key, int value) {
+ private static void setConfigWithDefaultPrefix(boolean isProvider, String key, int value) {
String configKey = Config.CONSUMER_LIMIT_KEY_PREFIX + key;
- ArchaiusUtils.setProperty(configKey, value);
- }
+ if (isProvider) {
+ configKey = Config.PROVIDER_LIMIT_KEY_PREFIX + key;
+ }
- public static void clearState(QpsControllerManager qpsControllerManager) {
- Map<String, QpsController> objMap = Deencapsulation.getField(qpsControllerManager, "qualifiedNameControllerMap");
- objMap.clear();
- Map<String, QpsController> configQpsControllerMap = Deencapsulation
- .getField(qpsControllerManager, "configQpsControllerMap");
- configQpsControllerMap.clear();
+ ArchaiusUtils.setProperty(configKey, value);
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java
index 04f9400..9035ba5 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConfig.java
@@ -23,10 +23,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-/**
- *
- *
- */
public class TestConfig {
@BeforeClass
public static void classSetup() {
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java
index 3e6bbba..5aef2dd 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestConsumerQpsFlowControlHandler.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
+import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy;
+import org.apache.servicecomb.qps.strategy.FixedWindowStrategy;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
@@ -42,7 +44,7 @@ import mockit.MockUp;
public class TestConsumerQpsFlowControlHandler {
- ConsumerQpsFlowControlHandler handler = new ConsumerQpsFlowControlHandler();
+ ConsumerQpsFlowControlHandler handler;
Invocation invocation = Mockito.mock(Invocation.class);
@@ -56,42 +58,38 @@ public class TestConsumerQpsFlowControlHandler {
@Before
public void setUP() {
ArchaiusUtils.resetConfig();
- QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr);
+ handler = new ConsumerQpsFlowControlHandler();
}
@After
public void afterTest() {
ArchaiusUtils.resetConfig();
- QpsControllerManagerTest.clearState(ConsumerQpsFlowControlHandler.qpsControllerMgr);
}
@Test
public void testQpsController() {
- // to avoid time influence on QpsController
- new MockUp<System>() {
- @Mock
- long currentTimeMillis() {
- return 1L;
- }
- };
- QpsController qpsController = new QpsController("abc", 100);
- Assert.assertEquals(false, qpsController.isLimitNewRequest());
+ AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy();
+ qpsStrategy.setKey("abc");
+ qpsStrategy.setQpsLimit(100L);
+ Assert.assertEquals(false, qpsStrategy.isLimitNewRequest());
- qpsController.setQpsLimit(1);
- Assert.assertEquals(true, qpsController.isLimitNewRequest());
+ qpsStrategy.setQpsLimit(1L);
+ Assert.assertEquals(true, qpsStrategy.isLimitNewRequest());
}
@Test
public void testHandle() throws Exception {
String key = "svc.schema.opr";
- QpsController qpsController = new QpsController("key", 12);
+ AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy();
+ qpsStrategy.setKey("key");
+ qpsStrategy.setQpsLimit(12L);
Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta);
Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.opr");
Mockito.when(invocation.getSchemaId()).thenReturn("schema");
Mockito.when(invocation.getMicroserviceName()).thenReturn("svc");
- setQpsController(key, qpsController);
- new MockUp<QpsController>() {
+ setQpsController(key, qpsStrategy);
+ new MockUp<FixedWindowStrategy>() {
@Mock
public boolean isLimitNewRequest() {
return true;
@@ -100,8 +98,8 @@ public class TestConsumerQpsFlowControlHandler {
new MockUp<QpsControllerManager>() {
@Mock
- protected QpsController create(String qualifiedNameKey) {
- return qpsController;
+ protected QpsStrategy create(String qualifiedNameKey) {
+ return qpsStrategy;
}
};
@@ -118,14 +116,16 @@ public class TestConsumerQpsFlowControlHandler {
@Test
public void testHandleIsLimitNewRequestAsFalse() throws Exception {
String key = "service.schema.id";
- QpsController qpsController = new QpsController("service", 12);
+ AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy();
+ qpsStrategy.setKey("service");
+ qpsStrategy.setQpsLimit(12L);
Mockito.when(invocation.getMicroserviceName()).thenReturn("service");
Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta);
Mockito.when(operationMeta.getSchemaQualifiedName()).thenReturn("schema.id");
- setQpsController(key, qpsController);
+ setQpsController(key, qpsStrategy);
- new MockUp<QpsController>() {
+ new MockUp<QpsStrategy>() {
@Mock
public boolean isLimitNewRequest() {
return false;
@@ -135,8 +135,8 @@ public class TestConsumerQpsFlowControlHandler {
new MockUp<QpsControllerManager>() {
@Mock
- protected QpsController create(String qualifiedNameKey) {
- return qpsController;
+ protected QpsStrategy create(String qualifiedNameKey) {
+ return qpsStrategy;
}
};
handler.handle(invocation, asyncResp);
@@ -144,10 +144,10 @@ public class TestConsumerQpsFlowControlHandler {
Mockito.verify(invocation).next(asyncResp);
}
- private void setQpsController(String key, QpsController qpsController) {
+ private void setQpsController(String key, QpsStrategy qpsStrategy) {
QpsControllerManager qpsControllerManager = Deencapsulation.getField(handler, "qpsControllerMgr");
- ConcurrentHashMap<String, QpsController> objMap = Deencapsulation
+ ConcurrentHashMap<String, QpsStrategy> objMap = Deencapsulation
.getField(qpsControllerManager, "qualifiedNameControllerMap");
- objMap.put(key, qpsController);
+ objMap.put(key, qpsStrategy);
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
index 9f73155..dfbecfd 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
@@ -26,6 +26,8 @@ import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
+import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy;
+import org.apache.servicecomb.qps.strategy.FixedWindowStrategy;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
@@ -43,28 +45,25 @@ import mockit.Mock;
import mockit.MockUp;
public class TestProviderQpsFlowControlHandler {
- ProviderQpsFlowControlHandler handler = new ProviderQpsFlowControlHandler();
+ ProviderQpsFlowControlHandler handler;
Invocation invocation = Mockito.mock(Invocation.class);
AsyncResponse asyncResp = Mockito.mock(AsyncResponse.class);
- OperationMeta operationMeta = Mockito.mock(OperationMeta.class);
-
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setUP() {
ArchaiusUtils.resetConfig();
- QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr);
+ handler = new ProviderQpsFlowControlHandler();
ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1);
}
@After
public void afterTest() {
ArchaiusUtils.resetConfig();
- QpsControllerManagerTest.clearState(ProviderQpsFlowControlHandler.qpsControllerMgr);
}
@Test
@@ -84,7 +83,6 @@ public class TestProviderQpsFlowControlHandler {
result = new RuntimeException("test error");
}
};
- mockUpSystemTime();
ProviderQpsFlowControlHandler gHandler = new ProviderQpsFlowControlHandler();
gHandler.handle(invocation, asyncResp);
@@ -100,22 +98,24 @@ public class TestProviderQpsFlowControlHandler {
@Test
public void testQpsController() {
- mockUpSystemTime();
- QpsController qpsController = new QpsController("abc", 100);
- assertFalse(qpsController.isLimitNewRequest());
+ AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy();
+ qpsStrategy.setKey("abc");
+ qpsStrategy.setQpsLimit(100L);
+ assertFalse(qpsStrategy.isLimitNewRequest());
- qpsController.setQpsLimit(1);
- assertTrue(qpsController.isLimitNewRequest());
+ qpsStrategy.setQpsLimit(1L);
+ assertTrue(qpsStrategy.isLimitNewRequest());
}
@Test
public void testHandleOnSourceMicroserviceNameIsNull() throws Exception {
Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
+ OperationMeta operationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr");
+ Mockito.when(invocation.getOperationMeta()).thenReturn(operationMeta);
+ Mockito.when(invocation.getSchemaId()).thenReturn("server");
// only when handler index <= 0, the qps logic works
Mockito.when(invocation.getHandlerIndex()).thenReturn(0);
ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1);
- ProviderQpsFlowControlHandler.qpsControllerMgr
- .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit");
handler.handle(invocation, asyncResp);
handler.handle(invocation, asyncResp);
@@ -145,8 +145,11 @@ public class TestProviderQpsFlowControlHandler {
new MockUp<QpsControllerManager>() {
@Mock
- protected QpsController create(String qualifiedNameKey) {
- return new QpsController(qualifiedNameKey, 1);
+ protected QpsStrategy create(String qualifiedNameKey) {
+ AbstractQpsStrategy strategy = new FixedWindowStrategy();
+ strategy.setKey(qualifiedNameKey);
+ strategy.setQpsLimit(1L);
+ return strategy;
}
};
@@ -172,8 +175,11 @@ public class TestProviderQpsFlowControlHandler {
new MockUp<QpsControllerManager>() {
@Mock
- protected QpsController create(String qualifiedNameKey) {
- return new QpsController(qualifiedNameKey, 1);
+ protected QpsStrategy create(String qualifiedNameKey) {
+ AbstractQpsStrategy strategy = new FixedWindowStrategy();
+ strategy.setKey(qualifiedNameKey);
+ strategy.setQpsLimit(1L);
+ return strategy;
}
};
handler.handle(invocation, asyncResp);
@@ -181,14 +187,4 @@ public class TestProviderQpsFlowControlHandler {
Mockito.verify(invocation, times(0)).next(asyncResp);
Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class));
}
-
- private void mockUpSystemTime() {
- // to avoid time influence on QpsController
- new MockUp<System>() {
- @Mock
- long currentTimeMillis() {
- return 1L;
- }
- };
- }
}
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java
new file mode 100644
index 0000000..04c6f02
--- /dev/null
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestQpsStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.qps;
+
+import org.apache.servicecomb.qps.strategy.AbstractQpsStrategy;
+import org.apache.servicecomb.qps.strategy.FixedWindowStrategy;
+import org.apache.servicecomb.qps.strategy.LeakyBucketStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @Author GuoYl123
+ * @Date 2020/7/16
+ **/
+public class TestQpsStrategy {
+
+ @Test
+ public void testFixedWindowStrategy() {
+ AbstractQpsStrategy qpsStrategy = new FixedWindowStrategy();
+ qpsStrategy.setKey("abc");
+ qpsStrategy.setQpsLimit(100L);
+ Assert.assertEquals(false, qpsStrategy.isLimitNewRequest());
+
+ qpsStrategy.setQpsLimit(1L);
+ Assert.assertEquals(true, qpsStrategy.isLimitNewRequest());
+ }
+
+
+ @Test
+ public void testLeakyBucketStrategy() {
+ LeakyBucketStrategy qpsStrategy = new LeakyBucketStrategy();
+ qpsStrategy.setKey("abc");
+ qpsStrategy.setQpsLimit(100L);
+ Assert.assertEquals(false, qpsStrategy.isLimitNewRequest());
+
+ qpsStrategy.setQpsLimit(1L);
+ qpsStrategy.setBucketLimit(1L);
+ Assert.assertEquals(true, qpsStrategy.isLimitNewRequest());
+ }
+
+}