You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/12/06 10:06:48 UTC
[servicecomb-java-chassis] branch master updated: [SCB-1056] put
provider QPS flow control in front, for highway transport
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 3f2d3da [SCB-1056] put provider QPS flow control in front, for highway transport
3f2d3da is described below
commit 3f2d3da028cf11eb52516c7710671236c4fc102f
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Thu Dec 6 17:36:26 2018 +0800
[SCB-1056] put provider QPS flow control in front, for highway transport
---
.../transport/highway/HighwayServerInvoke.java | 33 ++++++++++-
.../transport/highway/TestHighwayCodec.java | 5 +-
.../transport/highway/TestHighwayServerInvoke.java | 69 ++++++++++++++++++----
3 files changed, 90 insertions(+), 17 deletions(-)
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 20e8e08..32bfb67 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -20,12 +20,14 @@ package org.apache.servicecomb.transport.highway;
import java.util.Map;
import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
@@ -181,9 +183,38 @@ public class HighwayServerInvoke {
null);
invocation.onStart(null, start);
invocation.getInvocationStageTrace().startSchedule();
- operationMeta.getExecutor().execute(() -> runInExecutor());
+
+ // copied from HighwayCodec#decodeRequest()
+ // for temporary qps enhance purpose, we'll remove it when handler mechanism is refactored
+ invocation.mergeContext(header.getContext());
+
+ Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
+ if (qpsFlowControlReject.value) {
+ return;
+ }
+
+ operationMeta.getExecutor().execute(this::runInExecutor);
} catch (IllegalStateException e) {
sendResponse(header.getContext(), Response.providerFailResp(e));
}
}
+
+ private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+ Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+ @SuppressWarnings("deprecation")
+ Handler providerQpsFlowControlHandler = operationMeta.getProviderQpsFlowControlHandler();
+ if (null != providerQpsFlowControlHandler) {
+ try {
+ providerQpsFlowControlHandler.handle(invocation, response -> {
+ qpsFlowControlReject.value = true;
+ sendResponse(header.getContext(), response);
+ });
+ } catch (Exception e) {
+ LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+ qpsFlowControlReject.value = true;
+ sendResponse(header.getContext(), Response.providerFailResp(e));
+ }
+ }
+ return qpsFlowControlReject;
+ }
}
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index a9932e2..ad3fedd 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -37,7 +37,6 @@ import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory;
import org.apache.servicecomb.swagger.invocation.Response;
-import org.apache.servicecomb.swagger.invocation.context.InvocationContext;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
import org.apache.servicecomb.transport.highway.message.ResponseHeader;
import org.junit.After;
@@ -81,7 +80,7 @@ public class TestHighwayCodec {
}
@Before
- public void setUp() throws Exception {
+ public void setUp() {
ServiceRegistry serviceRegistry = ServiceRegistryFactory.createLocal();
serviceRegistry.init();
RegistryUtils.setServiceRegistry(serviceRegistry);
@@ -106,7 +105,7 @@ public class TestHighwayCodec {
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
header = null;
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index 7381dee..b4e202b 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,8 +17,13 @@
package org.apache.servicecomb.transport.highway;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
import javax.xml.ws.Holder;
+import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.SCBStatus;
import org.apache.servicecomb.core.definition.OperationMeta;
@@ -28,7 +33,11 @@ import org.apache.servicecomb.core.event.InvocationStartEvent;
import org.apache.servicecomb.core.executor.ReactiveExecutor;
import org.apache.servicecomb.core.unittest.UnitTestMeta;
import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.apache.servicecomb.transport.common.MockUtil;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
import org.junit.AfterClass;
@@ -44,6 +53,7 @@ import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
+import mockit.Deencapsulation;
import mockit.Mock;
import mockit.MockUp;
@@ -54,7 +64,7 @@ public class TestHighwayServerInvoke {
}
}
- private UnitTestMeta unitTestMeta;
+ private static UnitTestMeta unitTestMeta;
private ByteBuf netSocketBuffer;
@@ -70,6 +80,7 @@ public class TestHighwayServerInvoke {
public static void classSetup() {
EventManager.eventBus = new EventBus();
SCBEngine.getInstance().setStatus(SCBStatus.UP);
+ unitTestMeta = new UnitTestMeta();
new MockUp<System>() {
@Mock
@@ -87,7 +98,6 @@ public class TestHighwayServerInvoke {
@Before
public void setup() {
- unitTestMeta = new UnitTestMeta();
socketAddress = new MockUp<SocketAddress>() {
@Mock
public String host() {
@@ -153,22 +163,55 @@ public class TestHighwayServerInvoke {
requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
requestHeader.setSchemaId(schemaMeta.getSchemaId());
requestHeader.setOperationName(operationMeta.getOperationId());
- Assert.assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
+ assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
// exe失败
MockUtil.getInstance().decodeRequestSucc = false;
highwayServerInvoke.execute();
EventManager.unregister(subscriber);
- Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
- Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
- Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
- Assert.assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution() != 0);
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
- Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ assertTrue(Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+ assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+ assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+ assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution() != 0);
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+ assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+ }
+
+ @Test
+ public void testFlowControlQps() {
+ MockUtil.getInstance().mockHighwayCodec();
+ SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
+ OperationMeta operationMeta = schemaMeta.ensureFindOperation("add");
+ operationMeta.setExecutor(new ReactiveExecutor());
+ Deencapsulation.setField(operationMeta, "providerQpsFlowControlHandlerSearched", true);
+ Deencapsulation.setField(operationMeta, "providerQpsFlowControlHandler",
+ (Handler) (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+ new HttpStatus(429, "Too Many Requests"),
+ new CommonExceptionData("rejected by qps flowcontrol"))));
+
+ RequestHeader requestHeader = MockUtil.getInstance().requestHeader;
+ requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
+ requestHeader.setSchemaId(schemaMeta.getSchemaId());
+ requestHeader.setOperationName(operationMeta.getOperationId());
+
+ HighwayServerInvoke highwayServerInvoke = new HighwayServerInvoke();
+
+ assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
+ MockUtil.getInstance().decodeRequestSucc = true;
+
+ ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.enabled", "true");
+ ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", "1");
+
+ highwayServerInvoke.execute();
+ String bodyString = Buffer.buffer(netSocketBuffer).toString();
+ assertTrue(bodyString.contains("Too Many Requests"));
+ assertTrue(bodyString.contains("rejected by qps flowcontrol"));
+
+ ArchaiusUtils.resetConfig();
}
}