You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/12/06 10:06:48 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1056] put provider QPS flow control in front, for highway transport

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f2d3da  [SCB-1056] put provider QPS flow control in front, for highway transport
3f2d3da is described below

commit 3f2d3da028cf11eb52516c7710671236c4fc102f
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Thu Dec 6 17:36:26 2018 +0800

    [SCB-1056] put provider QPS flow control in front, for highway transport
---
 .../transport/highway/HighwayServerInvoke.java     | 33 ++++++++++-
 .../transport/highway/TestHighwayCodec.java        |  5 +-
 .../transport/highway/TestHighwayServerInvoke.java | 69 ++++++++++++++++++----
 3 files changed, 90 insertions(+), 17 deletions(-)

diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 20e8e08..32bfb67 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -20,12 +20,14 @@ package org.apache.servicecomb.transport.highway;
 import java.util.Map;
 
 import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
 
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.SCBEngine;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
@@ -181,9 +183,38 @@ public class HighwayServerInvoke {
           null);
       invocation.onStart(null, start);
       invocation.getInvocationStageTrace().startSchedule();
-      operationMeta.getExecutor().execute(() -> runInExecutor());
+
+      // copied from HighwayCodec#decodeRequest()
+      // for temporary qps enhance purpose, we'll remove it when handler mechanism is refactored
+      invocation.mergeContext(header.getContext());
+
+      Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
+      if (qpsFlowControlReject.value) {
+        return;
+      }
+
+      operationMeta.getExecutor().execute(this::runInExecutor);
     } catch (IllegalStateException e) {
       sendResponse(header.getContext(), Response.providerFailResp(e));
     }
   }
+
+  private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+    Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+    @SuppressWarnings("deprecation")
+    Handler providerQpsFlowControlHandler = operationMeta.getProviderQpsFlowControlHandler();
+    if (null != providerQpsFlowControlHandler) {
+      try {
+        providerQpsFlowControlHandler.handle(invocation, response -> {
+          qpsFlowControlReject.value = true;
+          sendResponse(header.getContext(), response);
+        });
+      } catch (Exception e) {
+        LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+        qpsFlowControlReject.value = true;
+        sendResponse(header.getContext(), Response.providerFailResp(e));
+      }
+    }
+    return qpsFlowControlReject;
+  }
 }
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index a9932e2..ad3fedd 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -37,7 +37,6 @@ import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.ServiceRegistry;
 import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory;
 import org.apache.servicecomb.swagger.invocation.Response;
-import org.apache.servicecomb.swagger.invocation.context.InvocationContext;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
 import org.apache.servicecomb.transport.highway.message.ResponseHeader;
 import org.junit.After;
@@ -81,7 +80,7 @@ public class TestHighwayCodec {
   }
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     ServiceRegistry serviceRegistry = ServiceRegistryFactory.createLocal();
     serviceRegistry.init();
     RegistryUtils.setServiceRegistry(serviceRegistry);
@@ -106,7 +105,7 @@ public class TestHighwayCodec {
   }
 
   @After
-  public void tearDown() throws Exception {
+  public void tearDown() {
 
     header = null;
 
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index 7381dee..b4e202b 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,8 +17,13 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
 import javax.xml.ws.Holder;
 
+import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.SCBEngine;
 import org.apache.servicecomb.core.SCBStatus;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -28,7 +33,11 @@ import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
 import org.apache.servicecomb.core.unittest.UnitTestMeta;
 import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.transport.common.MockUtil;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
 import org.junit.AfterClass;
@@ -44,6 +53,7 @@ import io.netty.buffer.ByteBuf;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetSocket;
 import io.vertx.core.net.SocketAddress;
+import mockit.Deencapsulation;
 import mockit.Mock;
 import mockit.MockUp;
 
@@ -54,7 +64,7 @@ public class TestHighwayServerInvoke {
     }
   }
 
-  private UnitTestMeta unitTestMeta;
+  private static UnitTestMeta unitTestMeta;
 
   private ByteBuf netSocketBuffer;
 
@@ -70,6 +80,7 @@ public class TestHighwayServerInvoke {
   public static void classSetup() {
     EventManager.eventBus = new EventBus();
     SCBEngine.getInstance().setStatus(SCBStatus.UP);
+    unitTestMeta = new UnitTestMeta();
 
     new MockUp<System>() {
       @Mock
@@ -87,7 +98,6 @@ public class TestHighwayServerInvoke {
 
   @Before
   public void setup() {
-    unitTestMeta = new UnitTestMeta();
     socketAddress = new MockUp<SocketAddress>() {
       @Mock
       public String host() {
@@ -153,22 +163,55 @@ public class TestHighwayServerInvoke {
     requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
     requestHeader.setSchemaId(schemaMeta.getSchemaId());
     requestHeader.setOperationName(operationMeta.getOperationId());
-    Assert.assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
+    assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
 
     // exe失败
     MockUtil.getInstance().decodeRequestSucc = false;
     highwayServerInvoke.execute();
     EventManager.unregister(subscriber);
 
-    Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
-    Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
-    Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
-    Assert.assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution() != 0);
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
-    Assert.assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+    assertTrue(Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+    assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+    assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+    assertTrue(highwayServerInvoke.invocation.getInvocationStageTrace().getStartExecution() != 0);
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStart());
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartSchedule());
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getStartHandlersRequest());
+    assertEquals(1, highwayServerInvoke.invocation.getInvocationStageTrace().getFinishHandlersResponse());
+  }
+
+  @Test
+  public void testFlowControlQps() {
+    MockUtil.getInstance().mockHighwayCodec();
+    SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
+    OperationMeta operationMeta = schemaMeta.ensureFindOperation("add");
+    operationMeta.setExecutor(new ReactiveExecutor());
+    Deencapsulation.setField(operationMeta, "providerQpsFlowControlHandlerSearched", true);
+    Deencapsulation.setField(operationMeta, "providerQpsFlowControlHandler",
+        (Handler) (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+            new HttpStatus(429, "Too Many Requests"),
+            new CommonExceptionData("rejected by qps flowcontrol"))));
+
+    RequestHeader requestHeader = MockUtil.getInstance().requestHeader;
+    requestHeader.setDestMicroservice(schemaMeta.getMicroserviceName());
+    requestHeader.setSchemaId(schemaMeta.getSchemaId());
+    requestHeader.setOperationName(operationMeta.getOperationId());
+
+    HighwayServerInvoke highwayServerInvoke = new HighwayServerInvoke();
+
+    assertTrue(highwayServerInvoke.init(connection, 0, requestHeader, null));
+    MockUtil.getInstance().decodeRequestSucc = true;
+
+    ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.enabled", "true");
+    ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", "1");
+
+    highwayServerInvoke.execute();
+    String bodyString = Buffer.buffer(netSocketBuffer).toString();
+    assertTrue(bodyString.contains("Too Many Requests"));
+    assertTrue(bodyString.contains("rejected by qps flowcontrol"));
+
+    ArchaiusUtils.resetConfig();
   }
 }