You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/12/04 13:55:00 UTC

[servicecomb-java-chassis] 01/02: [SCB-1056] put provider flow control logic in front

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit dfe51a756f0af8261f240d132ae8aab6c76797e4
Author: yaohaishi <ya...@huawei.com>
AuthorDate: Thu Nov 29 19:07:08 2018 +0800

    [SCB-1056] put provider flow control logic in front
---
 .../common/rest/AbstractRestInvocation.java        |  55 ++++++++-
 .../common/rest/TestAbstractRestInvocation.java    | 128 +++++++++++++++++----
 .../qps/ProviderQpsFlowControlHandler.java         |  16 ++-
 .../org/apache/servicecomb/qps/QpsController.java  |   4 +-
 .../qps/TestProviderQpsFlowControlHandler.java     |  54 +++++----
 5 files changed, 199 insertions(+), 58 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 932aef4..5f04c01 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
+import javax.xml.ws.Holder;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
@@ -36,6 +37,7 @@ import org.apache.servicecomb.common.rest.filter.HttpServerFilterBeforeSendRespo
 import org.apache.servicecomb.common.rest.locator.OperationLocator;
 import org.apache.servicecomb.common.rest.locator.ServicePathManager;
 import org.apache.servicecomb.core.Const;
+import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -47,6 +49,8 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public abstract class AbstractRestInvocation {
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRestInvocation.class);
 
@@ -126,6 +130,19 @@ public abstract class AbstractRestInvocation {
     invocation.getInvocationStageTrace().startSchedule();
     OperationMeta operationMeta = restOperationMeta.getOperationMeta();
 
+    try {
+      this.setContext();
+    } catch (Exception e) {
+      LOGGER.error("failed to set invocation context", e);
+      sendFailResponse(e);
+      return;
+    }
+
+    Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
+    if (qpsFlowControlReject.value) {
+      return;
+    }
+
     operationMeta.getExecutor().execute(() -> {
       synchronized (this.requestEx) {
         try {
@@ -150,6 +167,39 @@ public abstract class AbstractRestInvocation {
     });
   }
 
+  private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
+    Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
+    Handler providerQpsFlowControlHandler = findQpsFlowControlHandler(operationMeta);
+    if (null != providerQpsFlowControlHandler) {
+      try {
+        providerQpsFlowControlHandler.handle(invocation, response -> {
+          qpsFlowControlReject.value = true;
+          produceProcessor = ProduceProcessorManager.JSON_PROCESSOR;
+          sendResponse(response);
+        });
+      } catch (Exception e) {
+        LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
+        qpsFlowControlReject.value = true;
+        sendFailResponse(e);
+      }
+    }
+    return qpsFlowControlReject;
+  }
+
+  @VisibleForTesting
+  Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+    final List<Handler> providerHandlerChain = operationMeta.getSchemaMeta().getProviderHandlerChain();
+    Handler providerQpsFlowControlHandler = null;
+    for (Handler handler : providerHandlerChain) {
+      // matching by class name is more or less better than importing an extra maven dependency
+      if ("org.apache.servicecomb.qps.ProviderQpsFlowControlHandler".equals(handler.getClass().getName())) {
+        providerQpsFlowControlHandler = handler;
+        break;
+      }
+    }
+    return providerQpsFlowControlHandler;
+  }
+
   private boolean isInQueueTimeout() {
     return System.nanoTime() - invocation.getInvocationStageTrace().getStart() >
         CommonRestConfig.getRequestWaitInPoolTimeout() * 1_000_000;
@@ -183,7 +233,6 @@ public abstract class AbstractRestInvocation {
 
   protected Response prepareInvoke() throws Throwable {
     this.initProduceProcessor();
-    this.setContext();
     invocation.getHandlerContext().put(RestConst.REST_REQUEST, requestEx);
 
     invocation.getInvocationStageTrace().startServerFiltersRequest();
@@ -201,9 +250,7 @@ public abstract class AbstractRestInvocation {
 
   protected void doInvoke() throws Throwable {
     invocation.getInvocationStageTrace().startHandlersRequest();
-    invocation.next(resp -> {
-      sendResponseQuietly(resp);
-    });
+    invocation.next(resp -> sendResponseQuietly(resp));
   }
 
   public void sendFailResponse(Throwable throwable) {
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index faf37a5..3db0dbf 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -17,6 +17,8 @@
 
 package org.apache.servicecomb.common.rest;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -49,10 +51,11 @@ import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest;
+import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletResponse;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.context.HttpStatus;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.swagger.invocation.response.Headers;
@@ -188,8 +191,8 @@ public class TestAbstractRestInvocation {
       restInvocation.initProduceProcessor();
       Assert.fail("must throw exception");
     } catch (InvocationException e) {
-      Assert.assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
-      Assert.assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
+      assertEquals(Status.NOT_ACCEPTABLE, e.getStatus());
+      assertEquals("Accept null is not supported", ((CommonExceptionData) e.getErrorData()).getMessage());
     }
   }
 
@@ -253,7 +256,7 @@ public class TestAbstractRestInvocation {
   @Test
   public void getContext() {
     invocation.addContext("key", "test");
-    Assert.assertEquals("test", restInvocation.getContext("key"));
+    assertEquals("test", restInvocation.getContext("key"));
   }
 
   @Test
@@ -401,7 +404,7 @@ public class TestAbstractRestInvocation {
 
     restInvocation.invoke();
 
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartServerFiltersRequest());
   }
 
   @Test
@@ -571,7 +574,7 @@ public class TestAbstractRestInvocation {
     initRestInvocation();
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals(expected, result);
+    assertEquals(expected, result);
   }
 
   @Test
@@ -658,7 +661,7 @@ public class TestAbstractRestInvocation {
       restInvocation.sendResponse(response);
       Assert.fail("must throw exception");
     } catch (Error e) {
-      Assert.assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
+      assertEquals(headers.getHeaderMap(), resultHeaders.getHeaderMap());
     }
   }
 
@@ -693,8 +696,8 @@ public class TestAbstractRestInvocation {
     initRestInvocation();
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals("\"ok\"", buffer.toString());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
+    assertEquals("\"ok\"", buffer.toString());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishServerFiltersResponse());
   }
 
   @Test
@@ -747,7 +750,7 @@ public class TestAbstractRestInvocation {
     restInvocation.setHttpServerFilters(httpServerFilters);
 
     restInvocation.sendResponse(response);
-    Assert.assertEquals("\"ok\"-filter", buffer.toString());
+    assertEquals("\"ok\"-filter", buffer.toString());
   }
 
   @Test
@@ -905,10 +908,10 @@ public class TestAbstractRestInvocation {
     EventManager.unregister(subscriber);
 
     Assert.assertTrue(result.value);
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStart());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartSchedule());
     Assert.assertSame(invocation, eventHolder.value.getInvocation());
-    Assert.assertEquals("tid", invocation.getTraceId());
+    assertEquals("tid", invocation.getTraceId());
   }
 
   @Test
@@ -936,19 +939,14 @@ public class TestAbstractRestInvocation {
 
     Assert.assertTrue(result.value);
     Assert.assertSame(invocation, restInvocation.invocation);
-    Assert.assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
+    assertEquals(time, invocation.getInvocationStageTrace().getStartExecution());
   }
 
   @Test
   public void doInvoke(@Mocked Endpoint endpoint, @Mocked OperationMeta operationMeta,
       @Mocked Object[] swaggerArguments, @Mocked SchemaMeta schemaMeta) throws Throwable {
     Response response = Response.ok("ok");
-    Handler handler = new Handler() {
-      @Override
-      public void handle(Invocation invocation, AsyncResponse asyncResp) {
-        asyncResp.complete(response);
-      }
-    };
+    Handler handler = (invocation, asyncResp) -> asyncResp.complete(response);
     List<Handler> handlerChain = Arrays.asList(handler);
     Deencapsulation.setField(invocation, "handlerList", handlerChain);
 
@@ -964,7 +962,93 @@ public class TestAbstractRestInvocation {
     restInvocation.doInvoke();
 
     Assert.assertSame(response, result.value);
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
-    Assert.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartHandlersRequest());
+    assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishHandlersResponse());
+  }
+
+  @Test
+  public void scheduleInvocation_invocationContextDeserializeError() {
+    requestEx = new AbstractHttpServletRequest() {
+      @Override
+      public String getHeader(String name) {
+        return "{\"x-cse-src-microservice\":'source\"}";
+      }
+    };
+    Holder<Integer> status = new Holder<>();
+    Holder<String> reasonPhrase = new Holder<>();
+    Holder<Integer> endCount = new Holder<>(0);
+    responseEx = new AbstractHttpServletResponse() {
+      @SuppressWarnings("deprecation")
+      @Override
+      public void setStatus(int sc, String sm) {
+        status.value = sc;
+        reasonPhrase.value = sm;
+      }
+
+      @Override
+      public void flushBuffer() {
+        endCount.value = endCount.value + 1;
+      }
+
+      @Override
+      public void setContentType(String type) {
+        assertEquals("application/json; charset=utf-8", type);
+      }
+    };
+    restInvocation.requestEx = requestEx;
+    restInvocation.responseEx = responseEx;
+
+    restInvocation.scheduleInvocation();
+
+    assertEquals(Integer.valueOf(590), status.value);
+    assertEquals("Cse Internal Server Error", reasonPhrase.value);
+    assertEquals(Integer.valueOf(1), endCount.value);
+  }
+
+  @Test
+  public void scheduleInvocation_flowControlReject() {
+    restInvocation = new AbstractRestInvocationForTest() {
+      @Override
+      Handler findQpsFlowControlHandler(OperationMeta operationMeta) {
+        return (invocation, asyncResp) -> asyncResp.producerFail(new InvocationException(
+            new HttpStatus(429, "Too Many Requests"),
+            new CommonExceptionData("rejected by qps flowcontrol")));
+      }
+    };
+    Holder<Integer> status = new Holder<>();
+    Holder<String> reasonPhrase = new Holder<>();
+    Holder<Integer> endCount = new Holder<>(0);
+    Holder<String> responseBody = new Holder<>();
+    responseEx = new AbstractHttpServletResponse() {
+      @SuppressWarnings("deprecation")
+      @Override
+      public void setStatus(int sc, String sm) {
+        status.value = sc;
+        reasonPhrase.value = sm;
+      }
+
+      @Override
+      public void flushBuffer() {
+        endCount.value = endCount.value + 1;
+      }
+
+      @Override
+      public void setContentType(String type) {
+        assertEquals("application/json; charset=utf-8", type);
+      }
+
+      @Override
+      public void setBodyBuffer(Buffer bodyBuffer) {
+        responseBody.value = bodyBuffer.toString();
+      }
+    };
+    setup();
+
+    restInvocation.scheduleInvocation();
+
+    assertEquals(Integer.valueOf(429), status.value);
+    assertEquals("Too Many Requests", reasonPhrase.value);
+    assertEquals("{\"message\":\"rejected by qps flowcontrol\"}", responseBody.value);
+    assertEquals(Integer.valueOf(1), endCount.value);
   }
 }
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
index 88ee936..67ade94 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/ProviderQpsFlowControlHandler.java
@@ -32,21 +32,25 @@ public class ProviderQpsFlowControlHandler implements Handler {
 
   @Override
   public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
-    if (!Config.INSTANCE.isProviderEnabled()) {
+    if (invocation.getHandlerIndex() > 0) {
+      // handlerIndex > 0, which means this handler is executed in handler chain.
+      // As this flow control logic has been executed in advance, this time it should be ignored.
       invocation.next(asyncResp);
       return;
     }
 
+    // The real executing position of this handler is no longer in handler chain, but in AbstractRestInvocation.
+    // Therefore, the Invocation#next() method should not be called below.
+    if (!Config.INSTANCE.isProviderEnabled()) {
+      return;
+    }
+
     String microserviceName = invocation.getContext(Const.SRC_MICROSERVICE);
     QpsController qpsController =
         StringUtils.isEmpty(microserviceName)
             ? qpsControllerMgr.getGlobalQpsController()
             : qpsControllerMgr.getOrCreate(microserviceName, invocation);
-    if (isLimitNewRequest(qpsController, asyncResp)) {
-      return;
-    }
-
-    invocation.next(asyncResp);
+    isLimitNewRequest(qpsController, asyncResp);
   }
 
   private boolean isLimitNewRequest(QpsController qpsController, AsyncResponse asyncResp) {
diff --git a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
index 88c8074..5f294ba 100644
--- a/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
+++ b/handlers/handler-flowcontrol-qps/src/main/java/org/apache/servicecomb/qps/QpsController.java
@@ -59,15 +59,13 @@ public class QpsController {
     long msNow = System.currentTimeMillis();
     //Time jump cause the new request injected
     if (msNow - msCycleBegin > CYCLE_LENGTH || msNow < msCycleBegin) {
-     
-      //no need worry about concurrency porbleam  
+      //no need worry about concurrency problem
       lastRequestCount = newCount;
       msCycleBegin = msNow;
     }
 
     // Configuration update and use is at the situation of multi-threaded concurrency
     // It is possible that operation level updated to null,but schema level or microservice level does not updated
-    
     int limitValue = (qpsLimit == null) ? Integer.MAX_VALUE : qpsLimit;
     return newCount - lastRequestCount >= limitValue;
   }
diff --git a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
index f671eea..9f73155 100644
--- a/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
+++ b/handlers/handler-flowcontrol-qps/src/test/java/org/apache/servicecomb/qps/TestProviderQpsFlowControlHandler.java
@@ -18,6 +18,8 @@
 package org.apache.servicecomb.qps;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.times;
 
 import org.apache.servicecomb.core.Const;
@@ -59,7 +61,6 @@ public class TestProviderQpsFlowControlHandler {
     ArchaiusUtils.setProperty(Config.PROVIDER_LIMIT_KEY_PREFIX + "test", 1);
   }
 
-
   @After
   public void afterTest() {
     ArchaiusUtils.resetConfig();
@@ -71,6 +72,8 @@ public class TestProviderQpsFlowControlHandler {
       final @Injectable AsyncResponse asyncResp) throws Exception {
     new Expectations() {
       {
+        invocation.getHandlerIndex();
+        result = 0;
         invocation.getContext(Const.SRC_MICROSERVICE);
         result = "test";
         invocation.getOperationMeta();
@@ -95,25 +98,42 @@ public class TestProviderQpsFlowControlHandler {
     gHandler.handle(invocation, asyncResp);
   }
 
-
   @Test
   public void testQpsController() {
     mockUpSystemTime();
     QpsController qpsController = new QpsController("abc", 100);
-    assertEquals(false, qpsController.isLimitNewRequest());
+    assertFalse(qpsController.isLimitNewRequest());
 
     qpsController.setQpsLimit(1);
-    assertEquals(true, qpsController.isLimitNewRequest());
+    assertTrue(qpsController.isLimitNewRequest());
   }
 
   @Test
   public void testHandleOnSourceMicroserviceNameIsNull() throws Exception {
     Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
+    // only when handler index <= 0, the qps logic works
+    Mockito.when(invocation.getHandlerIndex()).thenReturn(0);
+    ArchaiusUtils.setProperty("servicecomb.flowcontrol.Provider.qps.global.limit", 1);
+    ProviderQpsFlowControlHandler.qpsControllerMgr
+        .setGlobalQpsController("servicecomb.flowcontrol.Provider.qps.global.limit");
+
+    handler.handle(invocation, asyncResp);
+    handler.handle(invocation, asyncResp);
+
+    // Invocation#getContext(String) is only invoked when the qps logic works
+    Mockito.verify(invocation, times(2)).getContext(Const.SRC_MICROSERVICE);
+    Mockito.verify(asyncResp, times(1)).producerFail(Mockito.any(Exception.class));
+  }
+
+  @Test
+  public void testHandleOnSourceOnHandlerIndexIsGreaterThan0() throws Exception {
+    Mockito.when(invocation.getContext(Const.SRC_MICROSERVICE)).thenReturn(null);
 
+    Mockito.when(invocation.getHandlerIndex()).thenReturn(1);
     handler.handle(invocation, asyncResp);
     handler.handle(invocation, asyncResp);
 
-    Mockito.verify(invocation, times(2)).next(asyncResp);
+    Mockito.verify(invocation, times(0)).getContext(Mockito.anyString());
   }
 
   @Test
@@ -122,25 +142,19 @@ public class TestProviderQpsFlowControlHandler {
     OperationMeta mockOperationMeta = QpsControllerManagerTest.getMockOperationMeta("pojo", "server", "opr");
     Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
     Mockito.when(invocation.getSchemaId()).thenReturn("server");
-    new MockUp<QpsController>() {
-      @Mock
-      public boolean isLimitNewRequest() {
-        return true;
-      }
-    };
 
     new MockUp<QpsControllerManager>() {
-
       @Mock
       protected QpsController create(String qualifiedNameKey) {
-        return new QpsController(qualifiedNameKey, 12);
+        return new QpsController(qualifiedNameKey, 1);
       }
     };
 
     handler.handle(invocation, asyncResp);
+    handler.handle(invocation, asyncResp);
 
     ArgumentCaptor<InvocationException> captor = ArgumentCaptor.forClass(InvocationException.class);
-    Mockito.verify(asyncResp).producerFail(captor.capture());
+    Mockito.verify(asyncResp, times(1)).producerFail(captor.capture());
 
     InvocationException invocationException = captor.getValue();
     assertEquals(QpsConst.TOO_MANY_REQUESTS_STATUS, invocationException.getStatus());
@@ -155,23 +169,17 @@ public class TestProviderQpsFlowControlHandler {
         .getMockOperationMeta("pojo", "server", "opr");
     Mockito.when(invocation.getOperationMeta()).thenReturn(mockOperationMeta);
     Mockito.when(invocation.getSchemaId()).thenReturn("server");
-    new MockUp<QpsController>() {
-      @Mock
-      public boolean isLimitNewRequest() {
-        return false;
-      }
-    };
 
     new MockUp<QpsControllerManager>() {
-
       @Mock
       protected QpsController create(String qualifiedNameKey) {
-        return new QpsController(qualifiedNameKey, 12);
+        return new QpsController(qualifiedNameKey, 1);
       }
     };
     handler.handle(invocation, asyncResp);
 
-    Mockito.verify(invocation).next(asyncResp);
+    Mockito.verify(invocation, times(0)).next(asyncResp);
+    Mockito.verify(asyncResp, times(0)).producerFail(Mockito.any(Exception.class));
   }
 
   private void mockUpSystemTime() {