You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/12/04 13:55:00 UTC
[servicecomb-java-chassis] 01/02: [SCB-1056] put provider flow
control logic in front
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit dfe51a756f0af8261f240d132ae8aab6c76797e4
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Thu Nov 29 19:07:08 2018 +0800
[SCB-1056] put provider flow control logic in front
---
.../common/rest/AbstractRestInvocation.java | 55 ++++++++-
.../common/rest/TestAbstractRestInvocation.java | 128 +++++++++++++++++----
.../qps/ProviderQpsFlowControlHandler.java | 16 ++-
.../org/apache/servicecomb/qps/QpsController.java | 4 +-
.../qps/TestProviderQpsFlowControlHandler.java | 54 +++++----
5 files changed, 199 insertions(+), 58 deletions(-)
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 932aef4..5f04c01 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
@@ -36,6 +37,7 @@ import org.apache.servicecomb.common.rest.filter.HttpServerFilterBeforeSendRespo
import org.apache.servicecomb.common.rest.locator.OperationLocator;
import org.apache.servicecomb.common.rest.locator.ServicePathManager;
import org.apache.servicecomb.core.Const;
+import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.OperationMeta;
@@ -47,6 +49,8 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public abstract class AbstractRestInvocation {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRestInvocation.class);
@@ -126,6 +130,19 @@ public abstract class AbstractRestInvocation {
invocation.getInvocationStageTrace().startSchedule();
OperationMeta operationMeta = restOperationMeta.getOperationMeta();
+ try {
+ this.setContext();
+ } catch (Exception e) {
+ LOGGER.error("failed to set invocation context", e);
+ sendFailResponse(e);
+ return;
+ }
+
+ Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
+ if (qpsFlowControlReject.value) {
+ return;
+ }
+
operationMeta.getExecutor().execute(() -> {
synchronized (this.requestEx) {
try {
@@ -150,6 +167,39 @@ public abstract class AbstractRestInvocation {
});
}
+ private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+ Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+ Handler providerQpsFlowControlHandler = findQpsFlowControlHandler(operationMeta);
+ if (null != providerQpsFlowControlHandler) {
+ try {
+ providerQpsFlowControlHandler.handle(invocation, response -> {
+ qpsFlowControlReject.value = true;
+ produceProcessor = ProduceProcessorManager.JSON_PROCESSOR;
+ sendResponse(response);
+ });
+ } catch (Exception e) {
+ LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+ qpsFlowControlReject.value = true;
+ sendFailResponse(e);
+ }
+ }
+ return qpsFlowControlReject;
+ }
+
+ @VisibleForTesting
+ Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+ final List<Handler> providerHandlerChain = operationMeta.getSchemaMeta().getProviderHandlerChain();
+ Handler providerQpsFlowControlHandler = null;
+ for (Handler handler : providerHandlerChain) {
+ // matching by class name is more or less better than importing an extra maven dependency
+ if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) {
+ providerQpsFlowControlHandler = handler;
+ break;
+ }
+ }
+ return providerQpsFlowControlHandler;
+ }
+
private boolean isInQueueTimeout() {
return System.nanoTime() - invocation.getInvocationStageTrace().getStart() >
CommonRestConfig.getRequestWaitInPoolTimeout() * 1_000_000;
@@ -183,7 +233,6 @@ public abstract class AbstractRestInvocation {
protected Response prepareInvoke() throws Throwable {
this.initProduceProcessor();
- this.setContext();
invocation.getHandlerContext().put(RestConst.REST_REQUEST, requestEx);
invocation.getInvocationStageTrace().startServerFiltersRequest();
@@ -201,9 +250,7 @@ public abstract class AbstractRestInvocation {
protected void doInvoke() throws Throwable {
invocation.getInvocationStageTrace().startHandlersRequest();
- invocation.next(resp -> {
- sendResponseQuietly(resp);
- });
+ invocation.next(resp -> sendResponseQuietly(resp));
}
public void sendFailResponse(Throwable throwable) {
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index faf37a5..3db0dbf 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -17,6 +17,8 @@
package org.apache.servicecomb.common.rest;
+import static org.junit.Assert.assertEquals;
+
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -49,10 +51,11 @@ import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.common.utils.JsonUtils;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest;
+import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletResponse;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.apache.servicecomb.swagger.invocation.response.Headers;
@@ -188,8 +191,8 @@ public class TestAbstractRestInvocation {
restInvocation.initProduceProcessor();
Assert.fail("must throw exception");
} catch (InvocationException e) {
- Assert.assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
- Assert.assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
+ assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
+ assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
}
}
@@ -253,7 +256,7 @@ public class TestAbstractRestInvocation {
@Test
public void getContext() {
invocation.addContext("key", "test");
- Assert.assertEquals("test", restInvocation.getContext("key"));
+ assertEquals("test", restInvocation.getContext("key"));
}
@Test
@@ -401,7 +404,7 @@ public class TestAbstractRestInvocation {
restInvocation.invoke();
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
}
@Test
@@ -571,7 +574,7 @@ public class TestAbstractRestInvocation {
initRestInvocation();
restInvocation.sendResponse(response);
- Assert.assertEquals(expected, result);
+ assertEquals(expected, result);
}
@Test
@@ -658,7 +661,7 @@ public class TestAbstractRestInvocation {
restInvocation.sendResponse(response);
Assert.fail("must throw exception");
} catch (Error e) {
- Assert.assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
+ assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
}
}
@@ -693,8 +696,8 @@ public class TestAbstractRestInvocation {
initRestInvocation();
restInvocation.sendResponse(response);
- Assert.assertEquals("\"ok\"", buffer.toString());
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
+ assertEquals("\"ok\"", buffer.toString());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
}
@Test
@@ -747,7 +750,7 @@ public class TestAbstractRestInvocation {
restInvocation.setHttpServerFilters(httpServerFilters);
restInvocation.sendResponse(response);
- Assert.assertEquals("\"ok\"-filter", buffer.toString());
+ assertEquals("\"ok\"-filter", buffer.toString());
}
@Test
@@ -905,10 +908,10 @@ public class TestAbstractRestInvocation {
EventManager.unregister(subscriber);
Assert.assertTrue(result.value);
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
Assert.assertSame(invocation, eventHolder.value.getInvocation());
- Assert.assertEquals("tid", invocation.getTraceId());
+ assertEquals("tid", invocation.getTraceId());
}
@Test
@@ -936,19 +939,14 @@ public class TestAbstractRestInvocation {
Assert.assertTrue(result.value);
Assert.assertSame(invocation, restInvocation.invocation);
- Assert.assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
+ assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
}
@Test
public void doInvoke(@Mocked Endpoint endpoint, @Mocked OperationMeta operationMeta,
@Mocked Object[] swaggerArguments, @Mocked SchemaMeta schemaMeta) throws Throwable {
Response response = Response.ok("ok");
- Handler handler = new Handler() {
- @Override
- public void handle(Invocation invocation, AsyncResponse asyncResp) {
- asyncResp.complete(response);
- }
- };
+ Handler handler = (invocation, asyncResp) -> asyncResp.complete(response);
List<Handler> handlerChain = Arrays.asList(handler);
Deencapsulation.setField(invocation, "handlerList", handlerChain);
@@ -964,7 +962,93 @@ public class TestAbstractRestInvocation {
restInvocation.doInvoke();
Assert.assertSame(response, result.value);
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
- Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
+ assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ }
+
+ @Test
+ public void scheduleInvocation_invocationContextDeserializeError() {
+ requestEx = new AbstractHttpServletRequest() {
+ @Override
+ public String getHeader(String name) {
+ return "{\"x-cse-src-microservice\":'source\"}";
+ }
+ };
+ Holder<Integer> status = new Holder<>();
+ Holder<String> reasonPhrase = new Holder<>();
+ Holder<Integer> endCount = new Holder<>(0);
+ responseEx = new AbstractHttpServletResponse() {
+ @SuppressWarnings("deprecation")
+ @Override
+ public void setStatus(int sc, String sm) {
+ status.value = sc;
+ reasonPhrase.value = sm;
+ }
+
+ @Override
+ public void flushBuffer() {
+ endCount.value = endCount.value + 1;
+ }
+
+ @Override
+ public void setContentType(String type) {
+ assertEquals("application/json; charset=utf-8", type);
+ }
+ };
+ restInvocation.requestEx = requestEx;
+ restInvocation.responseEx = responseEx;
+
+ restInvocation.scheduleInvocation();
+
+ assertEquals(Integer.valueOf(590), status.value);
+ assertEquals("Cse Internal Server Error", reasonPhrase.value);
+ assertEquals(Integer.valueOf(1), endCount.value);
+ }
+
+ @Test
+ public void scheduleInvocation_flowControlReject() {
+ restInvocation = new AbstractRestInvocationForTest() {
+ @Override
+ Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+ return (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+ new HttpStatus(429, "Too Many Requests"),
+ new CommonExceptionData("rejected by qps flowcontrol")));
+ }
+ };
+ Holder<Integer> status = new Holder<>();
+ Holder<String> reasonPhrase = new Holder<>();
+ Holder<Integer> endCount = new Holder<>(0);
+ Holder<String> responseBody = new Holder<>();
+ responseEx = new AbstractHttpServletResponse() {
+ @SuppressWarnings("deprecation")
+ @Override
+ public void setStatus(int sc, String sm) {
+ status.value = sc;
+ reasonPhrase.value = sm;
+ }
+
+ @Override
+ public void flushBuffer() {
+ endCount.value = endCount.value + 1;
+ }
+
+ @Override
+ public void setContentType(String type) {
+ assertEquals("application/json; charset=utf-8", type);
+ }
+
+ @Override
+ public void setBodyBuffer(Buffer bodyBuffer) {
+ responseBody.value = bodyBuffer.toString();
+ }
+ };
+ setup();
+
+ restInvocation.scheduleInvocation();
+
+ assertEquals(Integer.valueOf(429), status.value);
+ assertEquals("Too Many Requests", reasonPhrase.value);
+ assertEquals("{\"message\":\"rejected by qps flowcontrol\"}", responseBody.value);
+ assertEquals(Integer.valueOf(1), endCount.value);
}
}
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 88ee936..67ade94 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -32,21 +32,25 @@ public class ProviderQpsFlowControlHandler implements Handler {
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
- if (!Config.INSTANCE.isProviderEnabled()) {
+ if (invocation.getHandlerIndex() > 0) {
+ // handlerIndex > 0, which means this handler is executed in handler chain.
+ // As this flow control logic has been executed in advance, this time it should be ignored.
invocation.next(asyncResp);
return;
}
+ // The real executing position of this handler is no longer in handler chain, but in AbstractRestInvocation.
+ // Therefore, the Invocation#next() method should not be called below.
+ if (!Config.INSTANCE.isProviderEnabled()) {
+ return;
+ }
+
String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
QpsController qpsController =
StringUtils.isEmpty(microserviceName)
? qpsControllerMgr.getGlobalQpsController()
: qpsControllerMgr.getOrCreate(microserviceName, invocation);
- if (isLimitNewRequest(qpsController, asyncResp)) {
- return;
- }
-
- invocation.next(asyncResp);
+ isLimitNewRequest(qpsController, asyncResp);
}
private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) {
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
index 88c8074..5f294ba 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
@@ -59,15 +59,13 @@ public class QpsController {
long msNow = System.currentTimeMillis();
//Time jump cause the new request injected
if (msNow - msCycleBegin > CYCLE_LENGTH || msNow < msCycleBegin) {
-
- //no need worry about concurrency porbleam
+ //no need worry about concurrency problem
lastRequestCount = newCount;
msCycleBegin = msNow;
}
// Configuration update and use is at the situation of multi-threaded concurrency
// It is possible that operation level updated to null,but schema level or microservice level does not updated
-
int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit;
return newCount - lastRequestCount >= limitValue;
}
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
index f671eea..9f73155 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
@@ -18,6 +18,8 @@
package org.apache.servicecomb.qps;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import org.apache.servicecomb.core.Const;
@@ -59,7 +61,6 @@ public class TestProviderQpsFlowControlHandler {
ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1);
}
-
@After
public void afterTest() {
ArchaiusUtils.resetConfig();
@@ -71,6 +72,8 @@ public class TestProviderQpsFlowControlHandler {
final @Injectable AsyncResponse asyncResp) throws Exception {
new Expectations() {
{
+ invocation.getHandlerIndex();
+ result = 0;
invocation.getContext(Const.SRC_MICROSERVICE);
result = "test";
invocation.getOperationMeta();
@@ -95,25 +98,42 @@ public class TestProviderQpsFlowControlHandler {
gHandler.handle(invocation, asyncResp);
}
-
@Test
public void testQpsController() {
mockUpSystemTime();
QpsController qpsController = new QpsController("abc", 100);
- assertEquals(false, qpsController.isLimitNewRequest());
+ assertFalse(qpsController.isLimitNewRequest());
qpsController.setQpsLimit(1);
- assertEquals(true, qpsController.isLimitNewRequest());
+ assertTrue(qpsController.isLimitNewRequest());
}
@Test
public void testHandleOnSourceMicroserviceNameIsNull() throws Exception {
Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
+ // only when handler index <= 0, the qps logic works
+ Mockito.when(invocation.getHandlerIndex()).thenReturn(0);
+ ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1);
+ ProviderQpsFlowControlHandler.qpsControllerMgr
+ .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit");
+
+ handler.handle(invocation, asyncResp);
+ handler.handle(invocation, asyncResp);
+
+ // Invocation#getContext(String) is only invoked when the qps logic works
+ Mockito.verify(invocation, times(2)).getContext(Const.SRC_MICROSERVICE);
+ Mockito.verify(asyncResp, times(1)).producerFail(Mockito.any(Exception.class));
+ }
+
+ @Test
+ public void testHandleOnSourceOnHandlerIndexIsGreaterThan0() throws Exception {
+ Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
+ Mockito.when(invocation.getHandlerIndex()).thenReturn(1);
handler.handle(invocation, asyncResp);
handler.handle(invocation, asyncResp);
- Mockito.verify(invocation, times(2)).next(asyncResp);
+ Mockito.verify(invocation, times(0)).getContext(Mockito.anyString());
}
@Test
@@ -122,25 +142,19 @@ public class TestProviderQpsFlowControlHandler {
OperationMeta mockOperationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr");
Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
Mockito.when(invocation.getSchemaId()).thenReturn("server");
- new MockUp<QpsController>() {
- @Mock
- public boolean isLimitNewRequest() {
- return true;
- }
- };
new MockUp<QpsControllerManager>() {
-
@Mock
protected QpsController create(String qualifiedNameKey) {
- return new QpsController(qualifiedNameKey, 12);
+ return new QpsController(qualifiedNameKey, 1);
}
};
handler.handle(invocation, asyncResp);
+ handler.handle(invocation, asyncResp);
ArgumentCaptor<InvocationException> captor = ArgumentCaptor.forClass(InvocationException.class);
- Mockito.verify(asyncResp).producerFail(captor.capture());
+ Mockito.verify(asyncResp, times(1)).producerFail(captor.capture());
InvocationException invocationException = captor.getValue();
assertEquals(QpsConst.TOO_MANY_REQUESTS_STATUS, invocationException.getStatus());
@@ -155,23 +169,17 @@ public class TestProviderQpsFlowControlHandler {
.getMockOperationMeta("pojo", "server", "opr");
Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
Mockito.when(invocation.getSchemaId()).thenReturn("server");
- new MockUp<QpsController>() {
- @Mock
- public boolean isLimitNewRequest() {
- return false;
- }
- };
new MockUp<QpsControllerManager>() {
-
@Mock
protected QpsController create(String qualifiedNameKey) {
- return new QpsController(qualifiedNameKey, 12);
+ return new QpsController(qualifiedNameKey, 1);
}
};
handler.handle(invocation, asyncResp);
- Mockito.verify(invocation).next(asyncResp);
+ Mockito.verify(invocation, times(0)).next(asyncResp);
+ Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class));
}
private void mockUpSystemTime() {