You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/04 06:14:20 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1981] add filter chain to SCBEngine

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e39c91  [SCB-1981] add filter chain to SCBEngine
6e39c91 is described below

commit 6e39c91628abe8ef970d83b227bc97a2aa4df6e5
Author: wujimin <wu...@huawei.com>
AuthorDate: Wed Jun 3 15:45:46 2020 +0800

    [SCB-1981] add filter chain to SCBEngine
---
 .../org/apache/servicecomb/core/BootListener.java  | 16 ++++
 .../servicecomb/core/CseApplicationListener.java   |  2 +
 .../org/apache/servicecomb/core/SCBEngine.java     | 21 +++++
 .../core/bootstrap/SCBEngineForTest.java           |  8 +-
 .../core/definition/MicroserviceMeta.java          | 11 +++
 .../core/definition/ServiceRegistryListener.java   |  3 +-
 .../IllegalArgumentExceptionConverter.java         |  6 --
 .../core/filter/FilterChainsManager.java           |  4 +
 .../core/filter/impl/ProducerOperationFilter.java  | 10 ++-
 .../core/invocation/ProducerInvocationFlow.java    |  5 +-
 core/src/main/resources/microservice.yaml          | 21 +++++
 .../filter/impl/ProducerOperationFilterTest.java   |  3 +-
 .../invocation/ProducerInvocationFlowTest.java     | 97 ++++++++++++----------
 13 files changed, 147 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/BootListener.java b/core/src/main/java/org/apache/servicecomb/core/BootListener.java
index 0fcde52..cb740a2 100644
--- a/core/src/main/java/org/apache/servicecomb/core/BootListener.java
+++ b/core/src/main/java/org/apache/servicecomb/core/BootListener.java
@@ -21,6 +21,8 @@ public interface BootListener {
   enum EventType {
     BEFORE_HANDLER,
     AFTER_HANDLER,
+    BEFORE_FILTER,
+    AFTER_FILTER,
     BEFORE_PRODUCER_PROVIDER,
     AFTER_PRODUCER_PROVIDER,
     BEFORE_CONSUMER_PROVIDER,
@@ -67,6 +69,12 @@ public interface BootListener {
       case AFTER_HANDLER:
         onAfterHandler(event);
         return;
+      case BEFORE_FILTER:
+        onBeforeFilter(event);
+        return;
+      case AFTER_FILTER:
+        onAfterFilter(event);
+        return;
       case BEFORE_PRODUCER_PROVIDER:
         onBeforeProducerProvider(event);
         return;
@@ -110,6 +118,14 @@ public interface BootListener {
 
   }
 
+  default void onBeforeFilter(BootEvent event) {
+
+  }
+
+  default void onAfterFilter(BootEvent event) {
+
+  }
+
   default void onBeforeProducerProvider(BootEvent event) {
 
   }
diff --git a/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java b/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java
index 5da2045..e880d2c 100644
--- a/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java
+++ b/core/src/main/java/org/apache/servicecomb/core/CseApplicationListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.servicecomb.core;
 
+import org.apache.servicecomb.core.filter.FilterChainsManager;
 import org.apache.servicecomb.foundation.common.utils.BeanUtils;
 import org.apache.servicecomb.foundation.vertx.client.http.HttpClients;
 import org.apache.servicecomb.registry.DiscoveryManager;
@@ -75,6 +76,7 @@ public class CseApplicationListener
 //        SCBEngine.getInstance().setProducerProviderManager(applicationContext.getBean(ProducerProviderManager.class));
 //        SCBEngine.getInstance().setConsumerProviderManager(applicationContext.getBean(ConsumerProviderManager.class));
 //        SCBEngine.getInstance().setTransportManager(applicationContext.getBean(TransportManager.class));
+      scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class));
       scbEngine.getConsumerProviderManager().getConsumerProviderList()
           .addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values());
       scbEngine.getProducerProviderManager().getProducerProviderList()
diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
index 06a0796..cec49d6 100644
--- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
+++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
@@ -39,6 +39,7 @@ import org.apache.servicecomb.core.definition.ServiceRegistryListener;
 import org.apache.servicecomb.core.event.InvocationFinishEvent;
 import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ExecutorManager;
+import org.apache.servicecomb.core.filter.FilterChainsManager;
 import org.apache.servicecomb.core.handler.ConsumerHandlerManager;
 import org.apache.servicecomb.core.handler.HandlerConfigUtils;
 import org.apache.servicecomb.core.handler.ProducerHandlerManager;
@@ -86,6 +87,8 @@ public class SCBEngine {
 
   private volatile static SCBEngine INSTANCE;
 
+  private FilterChainsManager filterChainsManager;
+
   private ConsumerHandlerManager consumerHandlerManager = new ConsumerHandlerManager();
 
   private ProducerHandlerManager producerHandlerManager = new ProducerHandlerManager();
@@ -166,6 +169,19 @@ public class SCBEngine {
     return RegistrationManager.INSTANCE.getSwaggerLoader();
   }
 
+  public FilterChainsManager getFilterChainsManager() {
+    return filterChainsManager;
+  }
+
+  public SCBEngine setFilterChainsManager(FilterChainsManager filterChainsManager) {
+    this.filterChainsManager = filterChainsManager;
+    return this;
+  }
+
+  public boolean isFilterChainEnabled() {
+    return filterChainsManager.isEnabled();
+  }
+
   public ConsumerHandlerManager getConsumerHandlerManager() {
     return consumerHandlerManager;
   }
@@ -327,6 +343,10 @@ public class SCBEngine {
     HandlerConfigUtils.init(consumerHandlerManager, producerHandlerManager);
     triggerEvent(EventType.AFTER_HANDLER);
 
+    triggerEvent(EventType.BEFORE_FILTER);
+    filterChainsManager.init(this);
+    triggerEvent(EventType.AFTER_FILTER);
+
     createProducerMicroserviceMeta();
 
     triggerEvent(EventType.BEFORE_PRODUCER_PROVIDER);
@@ -357,6 +377,7 @@ public class SCBEngine {
 
     producerMicroserviceMeta = new MicroserviceMeta(this, microserviceName, false);
     producerMicroserviceMeta.setHandlerChain(producerHandlerManager.getOrCreate(microserviceName));
+    producerMicroserviceMeta.setFilterChain(filterChainsManager.createProducerFilterChain(microserviceName));
     producerMicroserviceMeta.setMicroserviceVersionsMeta(new MicroserviceVersionsMeta(this, microserviceName));
   }
 
diff --git a/core/src/main/java/org/apache/servicecomb/core/bootstrap/SCBEngineForTest.java b/core/src/main/java/org/apache/servicecomb/core/bootstrap/SCBEngineForTest.java
index 3ccf530..2a55597 100644
--- a/core/src/main/java/org/apache/servicecomb/core/bootstrap/SCBEngineForTest.java
+++ b/core/src/main/java/org/apache/servicecomb/core/bootstrap/SCBEngineForTest.java
@@ -21,6 +21,9 @@ import static org.apache.servicecomb.core.executor.ExecutorManager.EXECUTOR_GROU
 
 import org.apache.servicecomb.core.SCBEngine;
 import org.apache.servicecomb.core.executor.GroupExecutor;
+import org.apache.servicecomb.core.filter.FilterChainsManager;
+import org.apache.servicecomb.core.filter.FilterManager;
+import org.apache.servicecomb.core.filter.config.TransportFiltersConfig;
 import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.event.SimpleEventBus;
 import org.apache.servicecomb.foundation.common.utils.ReflectUtils;
@@ -31,6 +34,9 @@ import org.apache.servicecomb.foundation.common.utils.ReflectUtils;
 public class SCBEngineForTest extends SCBEngine {
   public SCBEngineForTest() {
     getExecutorManager().registerExecutor(EXECUTOR_GROUP_THREADPOOL, new GroupExecutor().init());
+    setFilterChainsManager(new FilterChainsManager()
+        .setTransportFiltersConfig(new TransportFiltersConfig())
+        .setFilterManager(new FilterManager()));
   }
 
   @Override
@@ -38,7 +44,7 @@ public class SCBEngineForTest extends SCBEngine {
     super.destroy();
 
     ReflectUtils.setField(SCBEngine.class, null, "INSTANCE", null);
-    
+
     EventManager.eventBus = new SimpleEventBus();
   }
 }
diff --git a/core/src/main/java/org/apache/servicecomb/core/definition/MicroserviceMeta.java b/core/src/main/java/org/apache/servicecomb/core/definition/MicroserviceMeta.java
index 74568e2..3cebb69 100644
--- a/core/src/main/java/org/apache/servicecomb/core/definition/MicroserviceMeta.java
+++ b/core/src/main/java/org/apache/servicecomb/core/definition/MicroserviceMeta.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.servicecomb.core.Handler;
 import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.core.filter.FilterNode;
 import org.apache.servicecomb.foundation.common.VendorExtensions;
 import org.apache.servicecomb.registry.definition.MicroserviceNameParser;
 import org.apache.servicecomb.swagger.SwaggerUtils;
@@ -66,6 +67,8 @@ public class MicroserviceMeta {
 
   private List<Handler> handlerChain = Collections.singletonList((invocation, ar) -> ar.success(null));
 
+  private FilterNode filterChain = FilterNode.EMPTY;
+
   // providerQpsFlowControlHandler is a temporary field, only for internal usage
   private Handler providerQpsFlowControlHandler;
 
@@ -197,6 +200,14 @@ public class MicroserviceMeta {
     this.handlerChain = handlerChain;
   }
 
+  public FilterNode getFilterChain() {
+    return filterChain;
+  }
+
+  public void setFilterChain(FilterNode filterChain) {
+    this.filterChain = filterChain;
+  }
+
   /**
    * Only for JavaChassis internal usage.
    */
diff --git a/core/src/main/java/org/apache/servicecomb/core/definition/ServiceRegistryListener.java b/core/src/main/java/org/apache/servicecomb/core/definition/ServiceRegistryListener.java
index 659921e..69b09d3 100644
--- a/core/src/main/java/org/apache/servicecomb/core/definition/ServiceRegistryListener.java
+++ b/core/src/main/java/org/apache/servicecomb/core/definition/ServiceRegistryListener.java
@@ -74,9 +74,10 @@ public class ServiceRegistryListener {
 
     // not shortName, to support cross app invoke
     String microserviceName = microserviceVersion.getMicroserviceName();
-
     MicroserviceMeta microserviceMeta = new MicroserviceMeta(scbEngine, microserviceName, true);
     microserviceMeta.setHandlerChain(scbEngine.getConsumerHandlerManager().getOrCreate(microserviceName));
+    microserviceMeta.setFilterChain(scbEngine.getFilterChainsManager().createConsumerFilterChain(microserviceName));
+
     MicroserviceVersions microserviceVersions = microserviceVersion.getMicroserviceVersions();
     microserviceMeta.setMicroserviceVersionsMeta(getMicroserviceVersionsMeta(microserviceVersions));
 
diff --git a/core/src/main/java/org/apache/servicecomb/core/exception/converter/IllegalArgumentExceptionConverter.java b/core/src/main/java/org/apache/servicecomb/core/exception/converter/IllegalArgumentExceptionConverter.java
index 2e3ccb9..e4187da 100644
--- a/core/src/main/java/org/apache/servicecomb/core/exception/converter/IllegalArgumentExceptionConverter.java
+++ b/core/src/main/java/org/apache/servicecomb/core/exception/converter/IllegalArgumentExceptionConverter.java
@@ -22,12 +22,8 @@ import javax.ws.rs.core.Response.StatusType;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.exception.ExceptionConverter;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class IllegalArgumentExceptionConverter implements ExceptionConverter<IllegalArgumentException> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(IllegalArgumentExceptionConverter.class);
-
   @Override
   public int getOrder() {
     return Short.MAX_VALUE;
@@ -41,8 +37,6 @@ public class IllegalArgumentExceptionConverter implements ExceptionConverter<Ill
   @Override
   public InvocationException convert(@Nullable Invocation invocation, IllegalArgumentException throwable,
       StatusType genericStatus) {
-    LOGGER.error("convert IllegalArgumentException exception to InvocationException.", throwable);
-
     return new InvocationException(genericStatus, ExceptionConverter.getGenericCode(genericStatus),
         "Parameters not valid or types not match.", throwable);
   }
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/FilterChainsManager.java b/core/src/main/java/org/apache/servicecomb/core/filter/FilterChainsManager.java
index c2d1e31..2a8f555 100644
--- a/core/src/main/java/org/apache/servicecomb/core/filter/FilterChainsManager.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/FilterChainsManager.java
@@ -131,6 +131,10 @@ public class FilterChainsManager {
   }
 
   private FilterNode createFilterNode(FilterChainsConfig chainsConfig, String microservice) {
+    if (!enabled) {
+      return FilterNode.EMPTY;
+    }
+
     List<Filter> filters = createFilters(chainsConfig, microservice);
     return FilterNode.buildChain(filters);
   }
diff --git a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilter.java b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilter.java
index 8819522..09c6238 100644
--- a/core/src/main/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilter.java
+++ b/core/src/main/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilter.java
@@ -75,8 +75,11 @@ public class ProducerOperationFilter implements Filter {
   }
 
   private void whenComplete(Invocation invocation, Throwable throwable) {
-    if (shouldPrintErrorLog(throwable)) {
-      LOGGER.error("unexpected error {},", invocation.getInvocationQualifiedName(), throwable);
+    if (throwable != null) {
+      Throwable unwrapped = Exceptions.unwrap(throwable);
+      if (shouldPrintErrorLog(unwrapped)) {
+        LOGGER.error("unexpected error, invocation={},", invocation.getInvocationQualifiedName(), unwrapped);
+      }
     }
 
     invocation.onBusinessMethodFinish();
@@ -88,7 +91,6 @@ public class ProducerOperationFilter implements Filter {
       return false;
     }
 
-    Throwable unwrapped = Exceptions.unwrap(throwable);
-    return !(unwrapped instanceof InvocationException);
+    return !(throwable instanceof InvocationException);
   }
 }
diff --git a/core/src/main/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlow.java b/core/src/main/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlow.java
index 2625838..0d0669f 100644
--- a/core/src/main/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlow.java
+++ b/core/src/main/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlow.java
@@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.exception.Exceptions;
-import org.apache.servicecomb.core.filter.FilterNode;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -62,7 +61,7 @@ public abstract class ProducerInvocationFlow {
     }
 
     invocation.onStart(requestEx, startTime);
-    getOrCreateFilterChain(invocation)
+    invocation.getMicroserviceMeta().getFilterChain()
         .onFilter(invocation)
         .whenComplete((response, Throwable) -> sendResponse(invocation, response))
         .whenComplete((response, Throwable) -> finishInvocation(invocation, response, Throwable));
@@ -85,8 +84,6 @@ public abstract class ProducerInvocationFlow {
         invocation.getMicroserviceQualifiedName(), requestEx.getRequestURI(), throwable);
   }
 
-  protected abstract FilterNode getOrCreateFilterChain(Invocation invocation);
-
   protected abstract Invocation sendCreateInvocationException(Throwable throwable);
 
   protected abstract void sendResponse(Invocation invocation, Response response);
diff --git a/core/src/main/resources/microservice.yaml b/core/src/main/resources/microservice.yaml
index c580fe1..f3b9fa3 100644
--- a/core/src/main/resources/microservice.yaml
+++ b/core/src/main/resources/microservice.yaml
@@ -17,3 +17,24 @@
 
 servicecomb-config-order: -500
 
+servicecomb:
+  filter-chains:
+    enabled: false
+    transport-filters:
+      #default-consumer-transport:
+      #  rest: rest-client-codec
+      #  highway: highway-client-codec
+      default-producer-tranport:
+        rest: rest-server-codec
+        highway: highway-server-codec
+    consumer:
+      default: simple-load-balance
+      #default: simple-load-balance, default-consumer-transport, transport-client
+      # samples for customize microservice filter chain
+      #policies:
+      #  ms-1: retry, load-balance, transport-client, ms-1-consumer-transport
+    producer:
+      default: default-producer-tranport, schedule, producer-operation
+      # samples for customize microservice filter chain
+      #policies:
+      #  ms-1: qps-limiter, ms-1-producer-transport, schedule, producer-operation
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilterTest.java b/core/src/test/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilterTest.java
index 0ed0c4b..fd932f1 100644
--- a/core/src/test/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilterTest.java
+++ b/core/src/test/java/org/apache/servicecomb/core/filter/impl/ProducerOperationFilterTest.java
@@ -72,7 +72,8 @@ public class ProducerOperationFilterTest {
   }
 
   @Test
-  public void should_record_invocation_trace_time() {
+  public void should_record_invocation_trace_time() throws NoSuchMethodException {
+    setInvokeSyncMethod();
     filter.onFilter(invocation, FilterNode.EMPTY);
 
     new Verifications() {
diff --git a/core/src/test/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlowTest.java b/core/src/test/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlowTest.java
index dca45b2..b31f6fa 100644
--- a/core/src/test/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlowTest.java
+++ b/core/src/test/java/org/apache/servicecomb/core/invocation/ProducerInvocationFlowTest.java
@@ -19,9 +19,8 @@ package org.apache.servicecomb.core.invocation;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.exception.Exceptions;
 import org.apache.servicecomb.core.filter.FilterNode;
 import org.apache.servicecomb.foundation.test.scaffolding.exception.RuntimeExceptionWithoutStackTrace;
@@ -29,9 +28,10 @@ import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.junit.Test;
 
+import mockit.Expectations;
 import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Verifications;
 
 public class ProducerInvocationFlowTest {
   class TestFlow extends ProducerInvocationFlow {
@@ -40,11 +40,6 @@ public class ProducerInvocationFlowTest {
     }
 
     @Override
-    protected FilterNode getOrCreateFilterChain(Invocation invocation) {
-      return filterNode;
-    }
-
-    @Override
     protected Invocation sendCreateInvocationException(Throwable throwable) {
       sendException = throwable;
       return null;
@@ -62,6 +57,12 @@ public class ProducerInvocationFlowTest {
 
   Invocation sendInvocation;
 
+  @Injectable
+  Invocation invocation;
+
+  @Mocked
+  MicroserviceMeta microserviceMeta;
+
   @Test
   public void should_send_exception_response_when_failed_to_create_invocation() {
     RuntimeException exception = new RuntimeExceptionWithoutStackTrace();
@@ -74,23 +75,33 @@ public class ProducerInvocationFlowTest {
     assertThat(Exceptions.unwrap(sendException)).isSameAs(exception);
   }
 
-  @Test
-  public void should_start_invocation_when_succeed_to_create_invocation(@Injectable Invocation invocation) {
-    TestFlow flow = new TestFlow(() -> invocation);
-    AtomicLong startTime = new AtomicLong();
-    new MockUp<Invocation>() {
-      @Mock
-      void onStart(HttpServletRequestEx requestEx, long start) {
-        startTime.set(start);
+  private void mockFilterChain() {
+    new Expectations() {
+      {
+        microserviceMeta.getFilterChain();
+        result = filterNode;
       }
     };
+  }
+
+  @Test
+  public void should_start_invocation_when_succeed_to_create_invocation() {
+    mockFilterChain();
+    TestFlow flow = new TestFlow(() -> invocation);
+
     flow.run();
 
-    assertThat(startTime.get()).isNotEqualTo(0);
+    new Verifications() {
+      {
+        invocation.onStart((HttpServletRequestEx) any, anyLong);
+        times = 1;
+      }
+    };
   }
 
   @Test
-  public void should_send_response_when_invocation_success(@Injectable Invocation invocation) {
+  public void should_send_response_when_invocation_success() {
+    mockFilterChain();
     TestFlow flow = new TestFlow(() -> invocation);
 
     flow.run();
@@ -99,27 +110,31 @@ public class ProducerInvocationFlowTest {
   }
 
   @Test
-  public void should_finish_invocation_when_invocation_success(@Injectable Invocation invocation) {
+  public void should_finish_invocation_when_invocation_success() {
+    mockFilterChain();
     TestFlow flow = new TestFlow(() -> invocation);
-    AtomicLong finishTime = new AtomicLong();
-    new MockUp<Invocation>() {
-      @Mock
-      void onFinish(Response response) {
-        finishTime.set(1);
-      }
-    };
 
     flow.run();
 
-    assertThat(finishTime.get()).isEqualTo(1);
+    new Verifications() {
+      {
+        invocation.onFinish((Response) any);
+        times = 1;
+      }
+    };
   }
 
-  @Test
-  public void should_send_response_when_invocation_fail(@Injectable Invocation invocation) {
-    TestFlow flow = new TestFlow(() -> invocation);
+  private void mockInvocationFailed() {
     filterNode = new FilterNode((_invocation, _node) -> {
       throw new RuntimeExceptionWithoutStackTrace();
     });
+    mockFilterChain();
+  }
+
+  @Test
+  public void should_send_response_when_invocation_fail() {
+    mockInvocationFailed();
+    TestFlow flow = new TestFlow(() -> invocation);
 
     flow.run();
 
@@ -127,21 +142,17 @@ public class ProducerInvocationFlowTest {
   }
 
   @Test
-  public void should_finish_invocation_when_invocation_fail(@Injectable Invocation invocation) {
+  public void should_finish_invocation_when_invocation_fail() {
+    mockInvocationFailed();
     TestFlow flow = new TestFlow(() -> invocation);
-    filterNode = new FilterNode((_invocation, _node) -> {
-      throw new RuntimeExceptionWithoutStackTrace();
-    });
-    AtomicLong finishTime = new AtomicLong();
-    new MockUp<Invocation>() {
-      @Mock
-      void onFinish(Response response) {
-        finishTime.set(1);
-      }
-    };
 
     flow.run();
 
-    assertThat(finishTime.get()).isEqualTo(1);
+    new Verifications() {
+      {
+        invocation.onFinish((Response) any);
+        times = 1;
+      }
+    };
   }
 }
\ No newline at end of file