You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/03/15 11:12:16 UTC

[GitHub] liubao68 closed pull request #585: [SCB-374] Invocation publish life event

liubao68 closed pull request #585: [SCB-374] Invocation publish life event
URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/585
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 7d8ab2c76..6e6f87631 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -37,12 +37,9 @@
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.OperationMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
@@ -109,12 +106,10 @@ public String getContext(String key) {
   }
 
   protected void scheduleInvocation() {
+    createInvocation();
+    invocation.onStart();
     OperationMeta operationMeta = restOperationMeta.getOperationMeta();
 
-    InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
-        InvocationType.PRODUCER, System.nanoTime());
-    EventBus.getInstance().triggerEvent(startedEvent);
-
     operationMeta.getExecutor().execute(() -> {
       synchronized (this.requestEx) {
         try {
@@ -127,7 +122,7 @@ protected void scheduleInvocation() {
             return;
           }
 
-          runOnExecutor(startedEvent);
+          runOnExecutor();
         } catch (Throwable e) {
           LOGGER.error("rest server onRequest error", e);
           sendFailResponse(e);
@@ -136,19 +131,16 @@ protected void scheduleInvocation() {
     });
   }
 
-  protected void runOnExecutor(InvocationStartedEvent startedEvent) {
-    createInvocation(null);
-
-    //立刻设置开始时间,否则Finished时无法计算TotalTime
-    invocation.setStartTime(startedEvent.getStartedTime());
-    invocation.triggerStartExecutionEvent();
+  protected void runOnExecutor() {
+    invocation.onStartExecute();
 
     invoke();
   }
 
   protected abstract OperationLocator locateOperation(ServicePathManager servicePathManager);
 
-  protected abstract void createInvocation(Object[] args);
+  // create a invocation without args setted
+  protected abstract void createInvocation();
 
   public void invoke() {
     try {
@@ -184,8 +176,6 @@ protected Response prepareInvoke() throws Throwable {
   protected void doInvoke() throws Throwable {
     invocation.next(resp -> {
       sendResponseQuietly(resp);
-
-      invocation.triggerFinishedEvent(resp.getStatusCode());
     });
   }
 
@@ -207,6 +197,7 @@ protected void sendResponseQuietly(Response response) {
           e);
     } finally {
       requestEx.getAsyncContext().complete();
+      invocation.onFinish(response);
     }
   }
 
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
index cd5e5f3e3..2e0cbe92c 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
@@ -50,6 +50,7 @@ public void invoke(Transport transport, HttpServletRequestEx requestEx, HttpServ
       return;
     }
 
+    createInvocation();
     scheduleInvocation();
   }
 
@@ -70,9 +71,9 @@ protected OperationLocator locateOperation(ServicePathManager servicePathManager
   }
 
   @Override
-  protected void createInvocation(Object[] args) {
+  protected void createInvocation() {
     this.invocation = InvocationFactory.forProvider(transport.getEndpoint(),
         restOperationMeta.getOperationMeta(),
-        args);
+        null);
   }
 }
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index aeac2ec36..291c7317b 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -39,27 +39,33 @@
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
 import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.swagger.invocation.response.Headers;
 import org.hamcrest.Matchers;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
 import io.vertx.core.buffer.Buffer;
 import mockit.Deencapsulation;
 import mockit.Expectations;
@@ -104,13 +110,23 @@ protected OperationLocator locateOperation(ServicePathManager servicePathManager
     }
 
     @Override
-    protected void createInvocation(Object[] args) {
+    protected void createInvocation() {
       this.invocation = TestAbstractRestInvocation.this.invocation;
     }
   }
 
   AbstractRestInvocation restInvocation = new AbstractRestInvocationForTest();
 
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
   @Before
   public void setup() {
     invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
@@ -369,6 +385,15 @@ protected void sendResponseQuietly(Response response) {
 
   @Test
   public void sendResponseQuietlyNormal(@Mocked Response response) {
+    Holder<InvocationFinishEvent> eventHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onFinished(InvocationFinishEvent event) {
+        eventHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     Holder<Response> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
@@ -384,6 +409,9 @@ protected void sendResponse(Response response) {
 
     restInvocation.sendResponseQuietly(response);
 
+    EventManager.unregister(subscriber);
+
+    Assert.assertSame(invocation, eventHolder.value.getInvocation());
     Assert.assertSame(response, result.value);
   }
 
@@ -636,8 +664,6 @@ public void scheduleInvocationException(@Mocked OperationMeta operationMeta) {
         result = operationMeta;
         operationMeta.getExecutor();
         result = executor;
-        operationMeta.getMicroserviceQualifiedName();
-        result = "sayHi";
       }
     };
 
@@ -645,7 +671,7 @@ public void scheduleInvocationException(@Mocked OperationMeta operationMeta) {
     Error error = new Error("run on executor");
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         throw error;
       }
 
@@ -682,7 +708,7 @@ public void scheduleInvocationTimeout(@Mocked OperationMeta operationMeta) {
 
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         throw new Error("run on executor");
       }
 
@@ -700,6 +726,22 @@ public void sendFailResponse(Throwable throwable) {
 
   @Test
   public void scheduleInvocationNormal(@Mocked OperationMeta operationMeta) {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+    Holder<InvocationStartEvent> eventHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        eventHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     Executor executor = new ReactiveExecutor();
     requestEx = new AbstractHttpServletRequest() {
     };
@@ -710,15 +752,13 @@ public void scheduleInvocationNormal(@Mocked OperationMeta operationMeta) {
         result = operationMeta;
         operationMeta.getExecutor();
         result = executor;
-        operationMeta.getMicroserviceQualifiedName();
-        result = "sayHi";
       }
     };
 
     Holder<Boolean> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         result.value = true;
       }
     };
@@ -726,12 +766,23 @@ protected void runOnExecutor(InvocationStartedEvent startedEvent) {
     restInvocation.restOperationMeta = restOperation;
 
     restInvocation.scheduleInvocation();
+    EventManager.unregister(subscriber);
 
     Assert.assertTrue(result.value);
+    Assert.assertEquals(time, invocation.getStartTime());
+    Assert.assertSame(invocation, eventHolder.value.getInvocation());
   }
 
   @Test
   public void runOnExecutor() {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+
     Holder<Boolean> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
@@ -739,12 +790,15 @@ public void invoke() {
         result.value = true;
       }
     };
+    restInvocation.createInvocation();
     restInvocation.requestEx = requestEx;
     restInvocation.restOperationMeta = restOperation;
 
-    restInvocation.runOnExecutor(new InvocationStartedEvent("", InvocationType.PRODUCER, System.nanoTime()));
+    restInvocation.runOnExecutor();
+
     Assert.assertTrue(result.value);
     Assert.assertSame(invocation, restInvocation.invocation);
+    Assert.assertEquals(time, invocation.getStartExecutionTime());
   }
 
   @Test
diff --git a/core/src/main/java/org/apache/servicecomb/core/Invocation.java b/core/src/main/java/org/apache/servicecomb/core/Invocation.java
index d26c903f1..369f07142 100644
--- a/core/src/main/java/org/apache/servicecomb/core/Invocation.java
+++ b/core/src/main/java/org/apache/servicecomb/core/Invocation.java
@@ -24,12 +24,16 @@
 
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationFinishedEvent;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.metrics.InvocationStartExecutionEvent;
+import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
 import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
 import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.SwaggerInvocation;
 
 public class Invocation extends SwaggerInvocation {
@@ -53,7 +57,6 @@
 
   private int handlerIndex;
 
-
   // 应答的处理器
   // 同步模式:避免应答在网络线程中处理解码等等业务级逻辑
   private Executor responseExecutor;
@@ -64,8 +67,12 @@
 
   private boolean sync = true;
 
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getStartExecutionTime() {
+    return startExecutionTime;
   }
 
   public Invocation(ReferenceConfig referenceConfig, OperationMeta operationMeta, Object[] swaggerArguments) {
@@ -185,7 +192,23 @@ public String getMicroserviceQualifiedName() {
     return operationMeta.getMicroserviceQualifiedName();
   }
 
-  public void triggerStartExecutionEvent() {
+  public void onStart() {
+    this.startTime = System.nanoTime();
+    EventManager.post(new InvocationStartEvent(this));
+
+    // old logic, need to be deleted
+    EventBus.getInstance().triggerEvent(new InvocationStartedEvent(getMicroserviceQualifiedName(),
+        invocationType, startTime));
+  }
+
+  public void onStartExecute() {
+    this.startExecutionTime = System.nanoTime();
+
+    // old logic, need to be deleted
+    triggerStartExecutionEvent();
+  }
+
+  private void triggerStartExecutionEvent() {
     if (InvocationType.PRODUCER.equals(invocationType)) {
       this.startExecutionTime = System.nanoTime();
       EventBus.getInstance()
@@ -193,10 +216,18 @@ public void triggerStartExecutionEvent() {
     }
   }
 
-  public void triggerFinishedEvent(int statusCode) {
+  public void onFinish(Response response) {
+    EventManager.post(new InvocationFinishEvent(this, response));
+
+    // old logic, need to be deleted
+    triggerFinishedEvent(response.getStatusCode());
+  }
+
+  private void triggerFinishedEvent(int statusCode) {
     long finishedTime = System.nanoTime();
     EventBus.getInstance()
-        .triggerEvent(new InvocationFinishedEvent(operationMeta.getMicroserviceQualifiedName(), this.invocationType,
+        .triggerEvent(new org.apache.servicecomb.core.metrics.InvocationFinishedEvent(
+            operationMeta.getMicroserviceQualifiedName(), this.invocationType,
             startExecutionTime - startTime, finishedTime - startExecutionTime,
             finishedTime - startTime, statusCode));
   }
@@ -208,4 +239,8 @@ public boolean isSync() {
   public void setSync(boolean sync) {
     this.sync = sync;
   }
+
+  public boolean isConsumer() {
+    return InvocationType.CONSUMER.equals(invocationType);
+  }
 }
diff --git a/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java b/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java
new file mode 100644
index 000000000..d611f7678
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public class InvocationFinishEvent {
+  private long nanoCurrent;
+
+  private Invocation invocation;
+
+  private Response response;
+
+  public InvocationFinishEvent(Invocation invocation, Response response) {
+    this.nanoCurrent = System.nanoTime();
+    this.invocation = invocation;
+    this.response = response;
+  }
+
+  public long getNanoCurrent() {
+    return nanoCurrent;
+  }
+
+  public Invocation getInvocation() {
+    return invocation;
+  }
+
+  public Response getResponse() {
+    return response;
+  }
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java b/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java
new file mode 100644
index 000000000..5b93e66d0
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+
+public class InvocationStartEvent {
+  private Invocation invocation;
+
+  public InvocationStartEvent(Invocation invocation) {
+    super();
+    this.invocation = invocation;
+  }
+
+  public Invocation getInvocation() {
+    return invocation;
+  }
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
index f15c6bb6d..7dc01f3a1 100644
--- a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
+++ b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
@@ -20,10 +20,7 @@
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.SchemaMeta;
 import org.apache.servicecomb.core.invocation.InvocationFactory;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.context.ContextUtils;
 import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
@@ -60,30 +57,30 @@ public static Object syncInvoke(Invocation invocation) throws InvocationExceptio
   }
 
   public static Response innerSyncInvoke(Invocation invocation) {
-    int statusCode = 0;
     try {
-      triggerStartedEvent(invocation);
+      invocation.onStart();
       SyncResponseExecutor respExecutor = new SyncResponseExecutor();
       invocation.setResponseExecutor(respExecutor);
 
       invocation.next(respExecutor::setResponse);
 
       Response response = respExecutor.waitResponse();
-      statusCode = response.getStatusCode();
+      invocation.onFinish(response);
       return response;
     } catch (Throwable e) {
       String msg =
           String.format("invoke failed, %s", invocation.getOperationMeta().getMicroserviceQualifiedName());
       LOGGER.debug(msg, e);
-      return Response.createConsumerFail(e);
-    } finally {
-      invocation.triggerFinishedEvent(statusCode);
+
+      Response response = Response.createConsumerFail(e);
+      invocation.onFinish(response);
+      return response;
     }
   }
 
   public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp) {
     try {
-      triggerStartedEvent(invocation);
+      invocation.onStart();
       invocation.setSync(false);
 
       ReactiveResponseExecutor respExecutor = new ReactiveResponseExecutor();
@@ -92,7 +89,7 @@ public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp
       invocation.next(ar -> {
         ContextUtils.setInvocationContext(invocation.getParentContext());
         try {
-          invocation.triggerFinishedEvent(ar.getStatusCode());
+          invocation.onFinish(ar);
           asyncResp.handle(ar);
         } finally {
           ContextUtils.removeInvocationContext();
@@ -100,9 +97,10 @@ public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp
       });
     } catch (Throwable e) {
       //if throw exception,we can use 500 for status code ?
-      invocation.triggerFinishedEvent(500);
+      Response response = Response.createConsumerFail(e);
+      invocation.onFinish(response);
       LOGGER.error("invoke failed, {}", invocation.getOperationMeta().getMicroserviceQualifiedName());
-      asyncResp.consumerFail(e);
+      asyncResp.handle(response);
     }
   }
 
@@ -110,11 +108,4 @@ public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp
   public static Object invoke(Invocation invocation) {
     return syncInvoke(invocation);
   }
-
-  private static void triggerStartedEvent(Invocation invocation) {
-    long startTime = System.nanoTime();
-    EventBus.getInstance().triggerEvent(new InvocationStartedEvent(invocation.getMicroserviceQualifiedName(),
-        InvocationType.CONSUMER, startTime));
-    invocation.setStartTime(startTime);
-  }
 }
diff --git a/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java b/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java
new file mode 100644
index 000000000..49352abf1
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core;
+
+import javax.xml.ws.Holder;
+
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
+import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestInvocation {
+  Invocation invocation;
+
+  @Mocked
+  Endpoint endpoint;
+
+  @Mocked
+  OperationMeta operationMeta;
+
+  @Mocked
+  Object[] swaggerArguments;
+
+  static long currentNanoTime = 123;
+
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  protected static void mockNonaTime() {
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return currentNanoTime;
+      }
+    };
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @Test
+  public void onStart() {
+    mockNonaTime();
+
+    Holder<Invocation> result = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        result.value = event.getInvocation();
+      }
+    };
+    EventManager.register(subscriber);
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onStart();
+
+    Assert.assertEquals(currentNanoTime, result.value.getStartTime());
+    Assert.assertSame(invocation, result.value);
+
+    EventManager.unregister(subscriber);
+  }
+
+  @Test
+  public void onStartExecute() {
+    mockNonaTime();
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onStartExecute();
+
+    Assert.assertEquals(currentNanoTime, invocation.getStartExecutionTime());
+  }
+
+  @Test
+  public void onFinish(@Mocked Response response) {
+    mockNonaTime();
+
+    Holder<InvocationFinishEvent> result = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationFinishEvent event) {
+        result.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onFinish(response);
+
+    Assert.assertEquals(currentNanoTime, result.value.getNanoCurrent());
+    Assert.assertSame(invocation, result.value.getInvocation());
+    Assert.assertSame(response, result.value.getResponse());
+
+    EventManager.unregister(subscriber);
+  }
+
+  @Test
+  public void isConsumer_yes() {
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    Assert.assertFalse(invocation.isConsumer());
+  }
+
+  @Test
+  public void isConsumer_no(@Mocked ReferenceConfig referenceConfig) {
+    Invocation invocation = new Invocation(referenceConfig, operationMeta, swaggerArguments);
+    Assert.assertTrue(invocation.isConsumer());
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java
new file mode 100644
index 000000000..5fe6b83d2
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.Assert;
+import org.junit.Test;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestInvocationFinishEvent {
+  InvocationFinishEvent event;
+
+  @Test
+  public void construct(@Mocked Invocation invocation, @Mocked Response response) {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+
+    event = new InvocationFinishEvent(invocation, response);
+
+    Assert.assertEquals(time, event.getNanoCurrent());
+    Assert.assertSame(invocation, event.getInvocation());
+    Assert.assertSame(response, event.getResponse());
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java
new file mode 100644
index 000000000..9181520bc
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import mockit.Mocked;
+
+public class TestInvocationStartEvent {
+  InvocationStartEvent event;
+
+  @Test
+  public void construct(@Mocked Invocation invocation) {
+    event = new InvocationStartEvent(invocation);
+
+    Assert.assertSame(invocation, event.getInvocation());
+  }
+}
diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
index f764d19cf..e8988876d 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
@@ -112,7 +112,7 @@ protected OperationLocator locateOperation(ServicePathManager servicePathManager
   }
 
   @Override
-  protected void createInvocation(Object[] args) {
+  protected void createInvocation() {
     ReferenceConfig referenceConfig = new ReferenceConfig();
     referenceConfig.setMicroserviceMeta(latestMicroserviceVersionMeta.getMicroserviceMeta());
     referenceConfig.setMicroserviceVersionRule(microserviceVersionRule.getVersionRule().getVersionRule());
@@ -120,7 +120,7 @@ protected void createInvocation(Object[] args) {
 
     this.invocation = InvocationFactory.forConsumer(referenceConfig,
         restOperationMeta.getOperationMeta(),
-        args);
+        null);
     this.invocation.setResponseExecutor(new ReactiveResponseExecutor());
   }
 }
diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
index 81160b49a..5cbadbf8b 100644
--- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
+++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
@@ -217,7 +217,6 @@ public void createInvocation(@Mocked MicroserviceVersionMeta microserviceVersion
     edgeInvocation.microserviceVersionRule = microserviceVersionRule;
     Deencapsulation.setField(edgeInvocation, "restOperationMeta", restOperationMeta);
 
-    Object[] args = new Object[] {};
     new Expectations(RegistryUtils.class) {
       {
         RegistryUtils.getMicroservice();
@@ -225,7 +224,7 @@ public void createInvocation(@Mocked MicroserviceVersionMeta microserviceVersion
       }
     };
 
-    edgeInvocation.createInvocation(args);
+    edgeInvocation.createInvocation();
     Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation");
     Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class));
   }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
index 525ad57ff..fc5a1483a 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
@@ -20,7 +20,6 @@
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.client.tcp.TcpData;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpOutputStream;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -31,15 +30,9 @@
 import io.vertx.core.buffer.Buffer;
 
 public final class HighwayCodec {
-  private static HighwayTransport highwayTransport;
-
   private HighwayCodec() {
   }
 
-  public static void setHighwayTransport(HighwayTransport highwayTransport) {
-    HighwayCodec.highwayTransport = highwayTransport;
-  }
-
   public static TcpOutputStream encodeRequest(long msgId, Invocation invocation,
       OperationProtobuf operationProtobuf, ProtobufFeature protobufFeature) throws Exception {
     // 写header
@@ -56,17 +49,13 @@ public static TcpOutputStream encodeRequest(long msgId, Invocation invocation,
     return os;
   }
 
-  public static Invocation decodeRequest(RequestHeader header, OperationProtobuf operationProtobuf,
+  public static void decodeRequest(Invocation invocation, RequestHeader header, OperationProtobuf operationProtobuf,
       Buffer bodyBuffer, ProtobufFeature protobufFeature) throws Exception {
     WrapSchema schema = operationProtobuf.getRequestSchema();
     Object[] args = schema.readObject(bodyBuffer, protobufFeature);
 
-    Invocation invocation =
-        InvocationFactory.forProvider(highwayTransport.getEndpoint(),
-            operationProtobuf.getOperationMeta(),
-            args);
+    invocation.setSwaggerArguments(args);
     invocation.setContext(header.getContext());
-    return invocation;
   }
 
   public static RequestHeader readRequestHeader(Buffer headerBuffer,
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
index c3c0c531c..5aca854b7 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
@@ -17,18 +17,21 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
 import org.apache.servicecomb.foundation.vertx.server.TcpServer;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
 
 public class HighwayServer extends TcpServer {
+  private Endpoint endpoint;
 
-  public HighwayServer(URIEndpointObject endpointObject) {
-    super(endpointObject);
+  public HighwayServer(Endpoint endpoint) {
+    super((URIEndpointObject) endpoint.getAddress());
+    this.endpoint = endpoint;
   }
 
   @Override
   protected TcpServerConnection createTcpServerConnection() {
-    return new HighwayServerConnection();
+    return new HighwayServerConnection(endpoint);
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index a8d7452d6..79b5e02c6 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,6 +18,7 @@
 
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
 import org.apache.servicecomb.foundation.vertx.server.TcpParser;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,8 +36,14 @@
 public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
 
+  private Endpoint endpoint;
+
   private ProtobufFeature protobufFeature = new ProtobufFeature();
 
+  public HighwayServerConnection(Endpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+
   @Override
   public void init(NetSocket netSocket) {
     splitter = new TcpParser(this);
@@ -115,7 +122,7 @@ protected void onLogin(long msgId, RequestHeader header, Buffer bodyBuffer) {
   }
 
   protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
-    HighwayServerInvoke invoke = new HighwayServerInvoke(protobufFeature);
+    HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint, protobufFeature);
     if (invoke.init(this, msgId, header, bodyBuffer)) {
       invoke.execute();
     }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 0450187c1..3dd600605 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -24,15 +24,14 @@
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
@@ -62,11 +61,16 @@
 
   private Buffer bodyBuffer;
 
+  private Endpoint endpoint;
+
+  Invocation invocation;
+
   public HighwayServerInvoke() {
-    this(null);
+    this(null, null);
   }
 
-  public HighwayServerInvoke(ProtobufFeature protobufFeature) {
+  public HighwayServerInvoke(Endpoint endpoint, ProtobufFeature protobufFeature) {
+    this.endpoint = endpoint;
     this.protobufFeature = protobufFeature;
   }
 
@@ -107,9 +111,9 @@ private void doInit(TcpConnection connection, long msgId, RequestHeader header,
     this.bodyBuffer = bodyBuffer;
   }
 
-  private void runInExecutor(InvocationStartedEvent startedEvent) {
+  private void runInExecutor() {
     try {
-      doRunInExecutor(startedEvent);
+      doRunInExecutor();
     } catch (Throwable e) {
       String msg = String.format("handle request error, %s, msgId=%d",
           operationMeta.getMicroserviceQualifiedName(),
@@ -120,16 +124,14 @@ private void runInExecutor(InvocationStartedEvent startedEvent) {
     }
   }
 
-  private void doRunInExecutor(InvocationStartedEvent startedEvent) throws Exception {
-    Invocation invocation = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, protobufFeature);
+  private void doRunInExecutor() throws Exception {
+    invocation.onStartExecute();
+
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, protobufFeature);
     invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, this.connection.getNetSocket().remoteAddress());
-    //立刻设置开始时间,否则Finished时无法计算TotalTime
-    invocation.setStartTime(startedEvent.getStartedTime());
-    invocation.triggerStartExecutionEvent();
 
     invocation.next(response -> {
       sendResponse(invocation.getContext(), response);
-      invocation.triggerFinishedEvent(response.getStatusCode());
     });
   }
 
@@ -155,6 +157,8 @@ private void sendResponse(Map<String, String> context, Response response) {
           operationProtobuf.getOperationMeta().getMicroserviceQualifiedName(),
           msgId);
       LOGGER.error(msg, e);
+    } finally {
+      invocation.onFinish(response);
     }
   }
 
@@ -162,9 +166,10 @@ private void sendResponse(Map<String, String> context, Response response) {
    * start time in queue.
    */
   public void execute() {
-    InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
-        InvocationType.PRODUCER, System.nanoTime());
-    EventBus.getInstance().triggerEvent(startedEvent);
-    operationMeta.getExecutor().execute(() -> runInExecutor(startedEvent));
+    invocation = InvocationFactory.forProvider(endpoint,
+        operationProtobuf.getOperationMeta(),
+        null);
+    invocation.onStart();
+    operationMeta.getExecutor().execute(() -> runInExecutor());
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
index e7a694be5..641276621 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
@@ -61,7 +61,7 @@ protected void startListen(Future<Void> startFuture) {
       return;
     }
 
-    HighwayServer server = new HighwayServer(endpointObject);
+    HighwayServer server = new HighwayServer(endpoint);
     server.init(vertx, SSL_KEY, ar -> {
       if (ar.succeeded()) {
         InetSocketAddress socketAddress = ar.result();
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
index 8f197aa45..195741f16 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
@@ -43,8 +43,6 @@ public String getName() {
   public boolean init() throws Exception {
     highwayClient.init(transportVertx);
 
-    HighwayCodec.setHighwayTransport(this);
-
     DeploymentOptions deployOptions = new DeploymentOptions().setInstances(HighwayConfig.getServerThreadCount());
     setListenAddressWithoutSchema(HighwayConfig.getAddress(), Collections.singletonMap(TcpConst.LOGIN, "true"));
     SimpleJsonObject json = new SimpleJsonObject();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index d387dafb7..1d7cd0998 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -19,12 +19,14 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.codec.protobuf.utils.schema.NotWrapSchema;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
@@ -51,6 +53,7 @@
 import io.vertx.core.buffer.Buffer;
 import mockit.Mock;
 import mockit.MockUp;
+import mockit.Mocked;
 
 public class TestHighwayCodec {
 
@@ -75,7 +78,6 @@
   @BeforeClass
   public static void setupClass() {
     ProtobufCompatibleUtils.init();
-    HighwayCodec.setHighwayTransport(new HighwayTransport());
   }
 
   @Before
@@ -126,17 +128,17 @@ public void tearDown() throws Exception {
   }
 
   @Test
-  public void testDecodeRequest() {
-    boolean status = true;
+  public void testDecodeRequest(@Mocked Endpoint endpoint) throws Exception {
+    commonMock();
+    Mockito.when(schemaMeta.getProviderHandlerChain()).thenReturn(Collections.emptyList());
+    Object[] args = new Object[] {};
+    Mockito.when(schema.readObject(bodyBuffer, null)).thenReturn(args);
+    
+    Invocation invocation = new Invocation(endpoint, operationMeta, null);
 
-    try {
-      commonMock();
-      Invocation inv = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, null);
-      Assert.assertNotNull(inv);
-    } catch (Exception e) {
-      status = false;
-    }
-    Assert.assertTrue(status);
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, null);
+
+    Assert.assertSame(args, invocation.getSwaggerArguments());
   }
 
   @Test
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index 8f6e2d4f3..e823e4af9 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -24,6 +24,7 @@
 import org.apache.servicecomb.codec.protobuf.utils.ProtobufSchemaUtils;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -58,6 +59,9 @@
   @Mocked
   MicroserviceMetaManager microserviceMetaManager;
 
+  @Mocked
+  Endpoint endpoint;
+
   @Mocked
   NetSocketImpl netSocket;
 
@@ -71,7 +75,7 @@ public void init() {
         result = new SocketAddressImpl(new InetSocketAddress("127.0.0.1", 80));
       }
     };
-    connection = new HighwayServerConnection();
+    connection = new HighwayServerConnection(endpoint);
     connection.init(netSocket);
 
     header = new RequestHeader();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index fb9b3b993..0dc902c81 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,17 +17,27 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import javax.xml.ws.Holder;
+
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
 import org.apache.servicecomb.core.unittest.UnitTestMeta;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
 import org.apache.servicecomb.transport.common.MockUtil;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
 import io.netty.buffer.ByteBuf;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetSocket;
@@ -52,6 +62,16 @@ public int add(int x, int y) {
 
   private SocketAddress socketAddress;
 
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
   @Before
   public void setup() {
     unitTestMeta = new UnitTestMeta();
@@ -87,6 +107,21 @@ public NetSocket getNetSocket() {
 
   @Test
   public void test() {
+    Holder<InvocationStartEvent> startHolder = new Holder<>();
+    Holder<InvocationFinishEvent> finishHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        startHolder.value = event;
+      }
+
+      @Subscribe
+      public void onFinish(InvocationFinishEvent event) {
+        finishHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     MockUtil.getInstance().mockHighwayCodec();
 
     SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
@@ -111,6 +146,11 @@ public void test() {
     // exe失败
     MockUtil.getInstance().decodeRequestSucc = false;
     highwayServerInvoke.execute();
+    EventManager.unregister(subscriber);
+
     Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+    Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+    Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+    Assert.assertTrue(highwayServerInvoke.invocation.getStartExecutionTime() != 0);
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services