You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/12/04 13:54:59 UTC

[servicecomb-java-chassis] branch master updated (e0e8df7 -> a216111)

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git.


    from e0e8df7  [SCB-1047]microservice.yaml service_description.version support format xxx.xx.xxx.xxx: modify details
     new dfe51a7  [SCB-1056] put provider flow control logic in front
     new a216111  [SCB-1056] hold ProviderQpsFlowControlHandler in OperationMeta

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/rest/AbstractRestInvocation.java        |  40 ++++++-
 .../common/rest/TestAbstractRestInvocation.java    | 129 +++++++++++++++++----
 .../servicecomb/core/definition/OperationMeta.java |  29 +++++
 .../qps/ProviderQpsFlowControlHandler.java         |  16 ++-
 .../org/apache/servicecomb/qps/QpsController.java  |   4 +-
 .../qps/TestProviderQpsFlowControlHandler.java     |  54 +++++----
 6 files changed, 214 insertions(+), 58 deletions(-)


[servicecomb-java-chassis] 01/02: [SCB-1056] put provider flow control logic in front

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit dfe51a756f0af8261f240d132ae8aab6c76797e4
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Thu Nov 29 19:07:08 2018 +0800

    [SCB-1056] put provider flow control logic in front
---
 .../common/rest/AbstractRestInvocation.java        |  55 ++++++++-
 .../common/rest/TestAbstractRestInvocation.java    | 128 +++++++++++++++++----
 .../qps/ProviderQpsFlowControlHandler.java         |  16 ++-
 .../org/apache/servicecomb/qps/QpsController.java  |   4 +-
 .../qps/TestProviderQpsFlowControlHandler.java     |  54 +++++----
 5 files changed, 199 insertions(+), 58 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 932aef4..5f04c01 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
@@ -36,6 +37,7 @@ import org.apache.servicecomb.common.rest.filter.HttpServerFilterBeforeSendRespo
 import org.apache.servicecomb.common.rest.locator.OperationLocator;
 import org.apache.servicecomb.common.rest.locator.ServicePathManager;
 import org.apache.servicecomb.core.Const;
+import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -47,6 +49,8 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractRestInvocation {
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRestInvocation.class);
 
@@ -126,6 +130,19 @@ public abstract class AbstractRestInvocation {
     invocation.getInvocationStageTrace().startSchedule();
     OperationMeta operationMeta = restOperationMeta.getOperationMeta();
 
+    try {
+      this.setContext();
+    } catch (Exception e) {
+      LOGGER.error("failed to set invocation context", e);
+      sendFailResponse(e);
+      return;
+    }
+
+    Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
+    if (qpsFlowControlReject.value) {
+      return;
+    }
+
     operationMeta.getExecutor().execute(() -> {
       synchronized (this.requestEx) {
         try {
@@ -150,6 +167,39 @@ public abstract class AbstractRestInvocation {
     });
   }
 
+  private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+    Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+    Handler providerQpsFlowControlHandler = findQpsFlowControlHandler(operationMeta);
+    if (null != providerQpsFlowControlHandler) {
+      try {
+        providerQpsFlowControlHandler.handle(invocation, response -> {
+          qpsFlowControlReject.value = true;
+          produceProcessor = ProduceProcessorManager.JSON_PROCESSOR;
+          sendResponse(response);
+        });
+      } catch (Exception e) {
+        LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+        qpsFlowControlReject.value = true;
+        sendFailResponse(e);
+      }
+    }
+    return qpsFlowControlReject;
+  }
+
+  @VisibleForTesting
+  Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+    final List<Handler> providerHandlerChain = operationMeta.getSchemaMeta().getProviderHandlerChain();
+    Handler providerQpsFlowControlHandler = null;
+    for (Handler handler : providerHandlerChain) {
+      // matching by class name is more or less better than importing an extra maven dependency
+      if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) {
+        providerQpsFlowControlHandler = handler;
+        break;
+      }
+    }
+    return providerQpsFlowControlHandler;
+  }
+
   private boolean isInQueueTimeout() {
     return System.nanoTime() - invocation.getInvocationStageTrace().getStart() >
         CommonRestConfig.getRequestWaitInPoolTimeout() * 1_000_000;
@@ -183,7 +233,6 @@ public abstract class AbstractRestInvocation {
 
   protected Response prepareInvoke() throws Throwable {
     this.initProduceProcessor();
-    this.setContext();
     invocation.getHandlerContext().put(RestConst.REST_REQUEST, requestEx);
 
     invocation.getInvocationStageTrace().startServerFiltersRequest();
@@ -201,9 +250,7 @@ public abstract class AbstractRestInvocation {
 
   protected void doInvoke() throws Throwable {
     invocation.getInvocationStageTrace().startHandlersRequest();
-    invocation.next(resp -> {
-      sendResponseQuietly(resp);
-    });
+    invocation.next(resp -> sendResponseQuietly(resp));
   }
 
   public void sendFailResponse(Throwable throwable) {
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index faf37a5..3db0dbf 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.common.rest;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -49,10 +51,11 @@ import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest;
+import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletResponse;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.swagger.invocation.response.Headers;
@@ -188,8 +191,8 @@ public class TestAbstractRestInvocation {
       restInvocation.initProduceProcessor();
       Assert.fail("must throw exception");
     } catch (InvocationException e) {
-      Assert.assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
-      Assert.assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
+      assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
+      assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
     }
   }
 
@@ -253,7 +256,7 @@ public class TestAbstractRestInvocation {
   @Test
   public void getContext() {
     invocation.addContext("key", "test");
-    Assert.assertEquals("test", restInvocation.getContext("key"));
+    assertEquals("test", restInvocation.getContext("key"));
   }
 
   @Test
@@ -401,7 +404,7 @@ public class TestAbstractRestInvocation {
 
     restInvocation.invoke();
 
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
   }
 
   @Test
@@ -571,7 +574,7 @@ public class TestAbstractRestInvocation {
     initRestInvocation();
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals(expected, result);
+    assertEquals(expected, result);
   }
 
   @Test
@@ -658,7 +661,7 @@ public class TestAbstractRestInvocation {
       restInvocation.sendResponse(response);
       Assert.fail("must throw exception");
     } catch (Error e) {
-      Assert.assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
+      assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
     }
   }
 
@@ -693,8 +696,8 @@ public class TestAbstractRestInvocation {
     initRestInvocation();
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals("\"ok\"", buffer.toString());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
+    assertEquals("\"ok\"", buffer.toString());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
   }
 
   @Test
@@ -747,7 +750,7 @@ public class TestAbstractRestInvocation {
     restInvocation.setHttpServerFilters(httpServerFilters);
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals("\"ok\"-filter", buffer.toString());
+    assertEquals("\"ok\"-filter", buffer.toString());
   }
 
   @Test
@@ -905,10 +908,10 @@ public class TestAbstractRestInvocation {
     EventManager.unregister(subscriber);
 
     Assert.assertTrue(result.value);
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
     Assert.assertSame(invocation, eventHolder.value.getInvocation());
-    Assert.assertEquals("tid", invocation.getTraceId());
+    assertEquals("tid", invocation.getTraceId());
   }
 
   @Test
@@ -936,19 +939,14 @@ public class TestAbstractRestInvocation {
 
     Assert.assertTrue(result.value);
     Assert.assertSame(invocation, restInvocation.invocation);
-    Assert.assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
+    assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
   }
 
   @Test
   public void doInvoke(@Mocked Endpoint endpoint, @Mocked OperationMeta operationMeta,
       @Mocked Object[] swaggerArguments, @Mocked SchemaMeta schemaMeta) throws Throwable {
     Response response = Response.ok("ok");
-    Handler handler = new Handler() {
-      @Override
-      public void handle(Invocation invocation, AsyncResponse asyncResp) {
-        asyncResp.complete(response);
-      }
-    };
+    Handler handler = (invocation, asyncResp) -> asyncResp.complete(response);
     List<Handler> handlerChain = Arrays.asList(handler);
     Deencapsulation.setField(invocation, "handlerList", handlerChain);
 
@@ -964,7 +962,93 @@ public class TestAbstractRestInvocation {
     restInvocation.doInvoke();
 
     Assert.assertSame(response, result.value);
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+  }
+
+  @Test
+  public void scheduleInvocation_invocationContextDeserializeError() {
+    requestEx = new AbstractHttpServletRequest() {
+      @Override
+      public String getHeader(String name) {
+        return "{\"x-cse-src-microservice\":'source\"}";
+      }
+    };
+    Holder<Integer> status = new Holder<>();
+    Holder<String> reasonPhrase = new Holder<>();
+    Holder<Integer> endCount = new Holder<>(0);
+    responseEx = new AbstractHttpServletResponse() {
+      @SuppressWarnings("deprecation")
+      @Override
+      public void setStatus(int sc, String sm) {
+        status.value = sc;
+        reasonPhrase.value = sm;
+      }
+
+      @Override
+      public void flushBuffer() {
+        endCount.value = endCount.value + 1;
+      }
+
+      @Override
+      public void setContentType(String type) {
+        assertEquals("application/json; charset=utf-8", type);
+      }
+    };
+    restInvocation.requestEx = requestEx;
+    restInvocation.responseEx = responseEx;
+
+    restInvocation.scheduleInvocation();
+
+    assertEquals(Integer.valueOf(590), status.value);
+    assertEquals("Cse Internal Server Error", reasonPhrase.value);
+    assertEquals(Integer.valueOf(1), endCount.value);
+  }
+
+  @Test
+  public void scheduleInvocation_flowControlReject() {
+    restInvocation = new AbstractRestInvocationForTest() {
+      @Override
+      Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+        return (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+            new HttpStatus(429, "Too Many Requests"),
+            new CommonExceptionData("rejected by qps flowcontrol")));
+      }
+    };
+    Holder<Integer> status = new Holder<>();
+    Holder<String> reasonPhrase = new Holder<>();
+    Holder<Integer> endCount = new Holder<>(0);
+    Holder<String> responseBody = new Holder<>();
+    responseEx = new AbstractHttpServletResponse() {
+      @SuppressWarnings("deprecation")
+      @Override
+      public void setStatus(int sc, String sm) {
+        status.value = sc;
+        reasonPhrase.value = sm;
+      }
+
+      @Override
+      public void flushBuffer() {
+        endCount.value = endCount.value + 1;
+      }
+
+      @Override
+      public void setContentType(String type) {
+        assertEquals("application/json; charset=utf-8", type);
+      }
+
+      @Override
+      public void setBodyBuffer(Buffer bodyBuffer) {
+        responseBody.value = bodyBuffer.toString();
+      }
+    };
+    setup();
+
+    restInvocation.scheduleInvocation();
+
+    assertEquals(Integer.valueOf(429), status.value);
+    assertEquals("Too Many Requests", reasonPhrase.value);
+    assertEquals("{\"message\":\"rejected by qps flowcontrol\"}", responseBody.value);
+    assertEquals(Integer.valueOf(1), endCount.value);
   }
 }
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 88ee936..67ade94 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -32,21 +32,25 @@ public class ProviderQpsFlowControlHandler implements Handler {
 
   @Override
   public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
-    if (!Config.INSTANCE.isProviderEnabled()) {
+    if (invocation.getHandlerIndex() > 0) {
+      // handlerIndex > 0, which means this handler is executed in handler chain.
+      // As this flow control logic has been executed in advance, this time it should be ignored.
       invocation.next(asyncResp);
       return;
     }
 
+    // The real executing position of this handler is no longer in handler chain, but in AbstractRestInvocation.
+    // Therefore, the Invocation#next() method should not be called below.
+    if (!Config.INSTANCE.isProviderEnabled()) {
+      return;
+    }
+
     String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
     QpsController qpsController =
         StringUtils.isEmpty(microserviceName)
             ? qpsControllerMgr.getGlobalQpsController()
             : qpsControllerMgr.getOrCreate(microserviceName, invocation);
-    if (isLimitNewRequest(qpsController, asyncResp)) {
-      return;
-    }
-
-    invocation.next(asyncResp);
+    isLimitNewRequest(qpsController, asyncResp);
   }
 
   private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) {
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
index 88c8074..5f294ba 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
@@ -59,15 +59,13 @@ public class QpsController {
     long msNow = System.currentTimeMillis();
     //Time jump cause the new request injected
     if (msNow - msCycleBegin > CYCLE_LENGTH || msNow < msCycleBegin) {
-     
-      //no need worry about concurrency porbleam  
+      //no need worry about concurrency problem
       lastRequestCount = newCount;
       msCycleBegin = msNow;
     }
 
     // Configuration update and use is at the situation of multi-threaded concurrency
     // It is possible that operation level updated to null,but schema level or microservice level does not updated
-    
     int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit;
     return newCount - lastRequestCount >= limitValue;
   }
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
index f671eea..9f73155 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.qps;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.times;
 
 import org.apache.servicecomb.core.Const;
@@ -59,7 +61,6 @@ public class TestProviderQpsFlowControlHandler {
     ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1);
   }
 
-
   @After
   public void afterTest() {
     ArchaiusUtils.resetConfig();
@@ -71,6 +72,8 @@ public class TestProviderQpsFlowControlHandler {
       final @Injectable AsyncResponse asyncResp) throws Exception {
     new Expectations() {
       {
+        invocation.getHandlerIndex();
+        result = 0;
         invocation.getContext(Const.SRC_MICROSERVICE);
         result = "test";
         invocation.getOperationMeta();
@@ -95,25 +98,42 @@ public class TestProviderQpsFlowControlHandler {
     gHandler.handle(invocation, asyncResp);
   }
 
-
   @Test
   public void testQpsController() {
     mockUpSystemTime();
     QpsController qpsController = new QpsController("abc", 100);
-    assertEquals(false, qpsController.isLimitNewRequest());
+    assertFalse(qpsController.isLimitNewRequest());
 
     qpsController.setQpsLimit(1);
-    assertEquals(true, qpsController.isLimitNewRequest());
+    assertTrue(qpsController.isLimitNewRequest());
   }
 
   @Test
   public void testHandleOnSourceMicroserviceNameIsNull() throws Exception {
     Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
+    // only when handler index <= 0, the qps logic works
+    Mockito.when(invocation.getHandlerIndex()).thenReturn(0);
+    ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1);
+    ProviderQpsFlowControlHandler.qpsControllerMgr
+        .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit");
+
+    handler.handle(invocation, asyncResp);
+    handler.handle(invocation, asyncResp);
+
+    // Invocation#getContext(String) is only invoked when the qps logic works
+    Mockito.verify(invocation, times(2)).getContext(Const.SRC_MICROSERVICE);
+    Mockito.verify(asyncResp, times(1)).producerFail(Mockito.any(Exception.class));
+  }
+
+  @Test
+  public void testHandleOnSourceOnHandlerIndexIsGreaterThan0() throws Exception {
+    Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
 
+    Mockito.when(invocation.getHandlerIndex()).thenReturn(1);
     handler.handle(invocation, asyncResp);
     handler.handle(invocation, asyncResp);
 
-    Mockito.verify(invocation, times(2)).next(asyncResp);
+    Mockito.verify(invocation, times(0)).getContext(Mockito.anyString());
   }
 
   @Test
@@ -122,25 +142,19 @@ public class TestProviderQpsFlowControlHandler {
     OperationMeta mockOperationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr");
     Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
     Mockito.when(invocation.getSchemaId()).thenReturn("server");
-    new MockUp<QpsController>() {
-      @Mock
-      public boolean isLimitNewRequest() {
-        return true;
-      }
-    };
 
     new MockUp<QpsControllerManager>() {
-
       @Mock
       protected QpsController create(String qualifiedNameKey) {
-        return new QpsController(qualifiedNameKey, 12);
+        return new QpsController(qualifiedNameKey, 1);
       }
     };
 
     handler.handle(invocation, asyncResp);
+    handler.handle(invocation, asyncResp);
 
     ArgumentCaptor<InvocationException> captor = ArgumentCaptor.forClass(InvocationException.class);
-    Mockito.verify(asyncResp).producerFail(captor.capture());
+    Mockito.verify(asyncResp, times(1)).producerFail(captor.capture());
 
     InvocationException invocationException = captor.getValue();
     assertEquals(QpsConst.TOO_MANY_REQUESTS_STATUS, invocationException.getStatus());
@@ -155,23 +169,17 @@ public class TestProviderQpsFlowControlHandler {
         .getMockOperationMeta("pojo", "server", "opr");
     Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
     Mockito.when(invocation.getSchemaId()).thenReturn("server");
-    new MockUp<QpsController>() {
-      @Mock
-      public boolean isLimitNewRequest() {
-        return false;
-      }
-    };
 
     new MockUp<QpsControllerManager>() {
-
       @Mock
       protected QpsController create(String qualifiedNameKey) {
-        return new QpsController(qualifiedNameKey, 12);
+        return new QpsController(qualifiedNameKey, 1);
       }
     };
     handler.handle(invocation, asyncResp);
 
-    Mockito.verify(invocation).next(asyncResp);
+    Mockito.verify(invocation, times(0)).next(asyncResp);
+    Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class));
   }
 
   private void mockUpSystemTime() {


[servicecomb-java-chassis] 02/02: [SCB-1056] hold ProviderQpsFlowControlHandler in OperationMeta

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit a21611163909bd075f3f7c737474e44c95f4c062
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Tue Dec 4 15:06:52 2018 +0800

    [SCB-1056] hold ProviderQpsFlowControlHandler in OperationMeta
---
 .../common/rest/AbstractRestInvocation.java        | 19 ++------------
 .../common/rest/TestAbstractRestInvocation.java    |  9 ++++---
 .../servicecomb/core/definition/OperationMeta.java | 29 ++++++++++++++++++++++
 3 files changed, 36 insertions(+), 21 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 5f04c01..d2b08aa 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -49,8 +49,6 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 public abstract class AbstractRestInvocation {
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRestInvocation.class);
 
@@ -169,7 +167,8 @@ public abstract class AbstractRestInvocation {
 
   private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
     Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
-    Handler providerQpsFlowControlHandler = findQpsFlowControlHandler(operationMeta);
+    @SuppressWarnings("deprecation")
+    Handler providerQpsFlowControlHandler = operationMeta.getProviderQpsFlowControlHandler();
     if (null != providerQpsFlowControlHandler) {
       try {
         providerQpsFlowControlHandler.handle(invocation, response -> {
@@ -186,20 +185,6 @@ public abstract class AbstractRestInvocation {
     return qpsFlowControlReject;
   }
 
-  @VisibleForTesting
-  Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
-    final List<Handler> providerHandlerChain = operationMeta.getSchemaMeta().getProviderHandlerChain();
-    Handler providerQpsFlowControlHandler = null;
-    for (Handler handler : providerHandlerChain) {
-      // matching by class name is more or less better than importing an extra maven dependency
-      if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) {
-        providerQpsFlowControlHandler = handler;
-        break;
-      }
-    }
-    return providerQpsFlowControlHandler;
-  }
-
   private boolean isInQueueTimeout() {
     return System.nanoTime() - invocation.getInvocationStageTrace().getStart() >
         CommonRestConfig.getRequestWaitInPoolTimeout() * 1_000_000;
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index 3db0dbf..5a8c859 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -1005,12 +1005,13 @@ public class TestAbstractRestInvocation {
     assertEquals(Integer.valueOf(1), endCount.value);
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void scheduleInvocation_flowControlReject() {
-    restInvocation = new AbstractRestInvocationForTest() {
-      @Override
-      Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
-        return (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+    new Expectations() {
+      {
+        operationMeta.getProviderQpsFlowControlHandler();
+        result = (Handler) (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
             new HttpStatus(429, "Too Many Requests"),
             new CommonExceptionData("rejected by qps flowcontrol")));
       }
diff --git a/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java b/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java
index 4e34716..cf90825 100644
--- a/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java
+++ b/core/src/main/java/org/apache/servicecomb/core/definition/OperationMeta.java
@@ -18,11 +18,13 @@
 package org.apache.servicecomb.core.definition;
 
 import java.lang.reflect.Method;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
+import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.executor.ExecutorManager;
 import org.apache.servicecomb.swagger.invocation.response.ResponseMeta;
 import org.apache.servicecomb.swagger.invocation.response.ResponsesMeta;
@@ -58,6 +60,12 @@ public class OperationMeta {
   // 为避免每个地方都做复杂的层次管理,直接在这里保存扩展数据
   private Map<String, Object> extData = new ConcurrentHashMap<>();
 
+  // providerQpsFlowControlHandler is a temporary filed, only for internal usage
+  private Handler providerQpsFlowControlHandler;
+
+  // providerQpsFlowControlHandlerSearched is a temporary filed, only for internal usage
+  private boolean providerQpsFlowControlHandlerSearched;
+
   private String transport = null;
 
   public void init(SchemaMeta schemaMeta, Method method, String operationPath, String httpMethod,
@@ -158,4 +166,25 @@ public class OperationMeta {
   public int getParamSize() {
     return swaggerOperation.getParameters().size();
   }
+
+  /**
+   * Only for JavaChassis internal usage.
+   */
+  @Deprecated
+  public Handler getProviderQpsFlowControlHandler() {
+    if (providerQpsFlowControlHandlerSearched) {
+      return providerQpsFlowControlHandler;
+    }
+
+    final List<Handler> providerHandlerChain = getSchemaMeta().getProviderHandlerChain();
+    for (Handler handler : providerHandlerChain) {
+      // matching by class name is more or less better than importing an extra maven dependency
+      if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) {
+        providerQpsFlowControlHandler = handler;
+        break;
+      }
+    }
+    providerQpsFlowControlHandlerSearched = true;
+    return providerQpsFlowControlHandler;
+  }
 }