You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/03/15 11:12:16 UTC

[incubator-servicecomb-java-chassis] branch master updated (15d60b3 -> 4ccab55)

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git.


    from 15d60b3  SCB-369 add spectator information to LICENSE
     new f81a8b9  SCB-374 define invocation life event
     new 4ea8e6f  SCB-374 [WIP] invocation add publish life event method
     new c59abf6  SCB-374 [WIP] consumer invocation publish life event
     new aa6886d  SCB-374 [WIP] invocation from rest transport publish event
     new 4ccab55  SCB-374 invocation from highway transport publish event

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../common/rest/AbstractRestInvocation.java        |  25 ++--
 .../common/rest/RestProducerInvocation.java        |   5 +-
 .../common/rest/TestAbstractRestInvocation.java    |  76 ++++++++++--
 .../org/apache/servicecomb/core/Invocation.java    |  49 ++++++--
 .../core/event/InvocationFinishEvent.java          |  26 ++--
 .../core/event/InvocationStartEvent.java           |  16 +--
 .../core/provider/consumer/InvokerUtils.java       |  31 ++---
 .../apache/servicecomb/core/TestInvocation.java    | 138 +++++++++++++++++++++
 .../core/event/TestInvocationFinishEvent.java      |  30 +++--
 .../core/event/TestInvocationStartEvent.java       |  15 ++-
 .../servicecomb/edge/core/EdgeInvocation.java      |   4 +-
 .../servicecomb/edge/core/TestEdgeInvocation.java  |   3 +-
 .../transport/highway/HighwayCodec.java            |  15 +--
 .../transport/highway/HighwayServer.java           |   9 +-
 .../transport/highway/HighwayServerConnection.java |   9 +-
 .../transport/highway/HighwayServerInvoke.java     |  39 +++---
 .../transport/highway/HighwayServerVerticle.java   |   2 +-
 .../transport/highway/HighwayTransport.java        |   2 -
 .../transport/highway/TestHighwayCodec.java        |  24 ++--
 .../highway/TestHighwayServerConnection.java       |   6 +-
 .../transport/highway/TestHighwayServerInvoke.java |  40 ++++++
 21 files changed, 418 insertions(+), 146 deletions(-)
 copy handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/filter/TransactionControlFilter.java => core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java (67%)
 copy handlers/handler-bizkeeper/src/main/java/org/apache/servicecomb/bizkeeper/FallbackPolicy.java => core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java (74%)
 create mode 100644 core/src/test/java/org/apache/servicecomb/core/TestInvocation.java
 copy swagger/swagger-invocation/invocation-core/src/test/java/org/apache/servicecomb/swagger/invocation/exception/TestDefaultExceptionToResponseConverter.java => core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java (59%)
 copy providers/provider-springmvc/src/test/java/org/apache/servicecomb/provider/springmvc/reference/TestCseRestTemplate.java => core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java (71%)

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.

[incubator-servicecomb-java-chassis] 01/05: SCB-374 define invocation life event

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit f81a8b9ec1bc9358ab9481c8c98ae0b29261f1cc
Author: wujimin <wu...@huawei.com>
AuthorDate: Fri Mar 9 12:40:10 2018 +0800

    SCB-374 define invocation life event
---
 .../core/event/InvocationFinishEvent.java          | 46 +++++++++++++++++++++
 .../core/event/InvocationStartEvent.java           | 32 +++++++++++++++
 .../core/event/TestInvocationFinishEvent.java      | 47 ++++++++++++++++++++++
 .../core/event/TestInvocationStartEvent.java       | 34 ++++++++++++++++
 4 files changed, 159 insertions(+)

diff --git a/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java b/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java
new file mode 100644
index 0000000..d611f76
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/event/InvocationFinishEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.swagger.invocation.Response;
+
+public class InvocationFinishEvent {
+  private long nanoCurrent;
+
+  private Invocation invocation;
+
+  private Response response;
+
+  public InvocationFinishEvent(Invocation invocation, Response response) {
+    this.nanoCurrent = System.nanoTime();
+    this.invocation = invocation;
+    this.response = response;
+  }
+
+  public long getNanoCurrent() {
+    return nanoCurrent;
+  }
+
+  public Invocation getInvocation() {
+    return invocation;
+  }
+
+  public Response getResponse() {
+    return response;
+  }
+}
diff --git a/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java b/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java
new file mode 100644
index 0000000..5b93e66
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/event/InvocationStartEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+
+public class InvocationStartEvent {
+  private Invocation invocation;
+
+  public InvocationStartEvent(Invocation invocation) {
+    super();
+    this.invocation = invocation;
+  }
+
+  public Invocation getInvocation() {
+    return invocation;
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java
new file mode 100644
index 0000000..5fe6b83
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationFinishEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.Assert;
+import org.junit.Test;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestInvocationFinishEvent {
+  InvocationFinishEvent event;
+
+  @Test
+  public void construct(@Mocked Invocation invocation, @Mocked Response response) {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+
+    event = new InvocationFinishEvent(invocation, response);
+
+    Assert.assertEquals(time, event.getNanoCurrent());
+    Assert.assertSame(invocation, event.getInvocation());
+    Assert.assertSame(response, event.getResponse());
+  }
+}
diff --git a/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java
new file mode 100644
index 0000000..9181520
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/event/TestInvocationStartEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.event;
+
+import org.apache.servicecomb.core.Invocation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import mockit.Mocked;
+
+public class TestInvocationStartEvent {
+  InvocationStartEvent event;
+
+  @Test
+  public void construct(@Mocked Invocation invocation) {
+    event = new InvocationStartEvent(invocation);
+
+    Assert.assertSame(invocation, event.getInvocation());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.

[incubator-servicecomb-java-chassis] 03/05: SCB-374 [WIP] consumer invocation publish life event

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit c59abf66a663871d303eee683bf511b70e10a2b3
Author: wujimin <wu...@huawei.com>
AuthorDate: Fri Mar 9 15:41:30 2018 +0800

    SCB-374 [WIP] consumer invocation publish life event
---
 .../core/provider/consumer/InvokerUtils.java       | 31 ++++++++--------------
 1 file changed, 11 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
index f15c6bb..7dc01f3 100644
--- a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
+++ b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
@@ -20,10 +20,7 @@ package org.apache.servicecomb.core.provider.consumer;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.SchemaMeta;
 import org.apache.servicecomb.core.invocation.InvocationFactory;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.context.ContextUtils;
 import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
@@ -60,30 +57,30 @@ public final class InvokerUtils {
   }
 
   public static Response innerSyncInvoke(Invocation invocation) {
-    int statusCode = 0;
     try {
-      triggerStartedEvent(invocation);
+      invocation.onStart();
       SyncResponseExecutor respExecutor = new SyncResponseExecutor();
       invocation.setResponseExecutor(respExecutor);
 
       invocation.next(respExecutor::setResponse);
 
       Response response = respExecutor.waitResponse();
-      statusCode = response.getStatusCode();
+      invocation.onFinish(response);
       return response;
     } catch (Throwable e) {
       String msg =
           String.format("invoke failed, %s", invocation.getOperationMeta().getMicroserviceQualifiedName());
       LOGGER.debug(msg, e);
-      return Response.createConsumerFail(e);
-    } finally {
-      invocation.triggerFinishedEvent(statusCode);
+
+      Response response = Response.createConsumerFail(e);
+      invocation.onFinish(response);
+      return response;
     }
   }
 
   public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp) {
     try {
-      triggerStartedEvent(invocation);
+      invocation.onStart();
       invocation.setSync(false);
 
       ReactiveResponseExecutor respExecutor = new ReactiveResponseExecutor();
@@ -92,7 +89,7 @@ public final class InvokerUtils {
       invocation.next(ar -> {
         ContextUtils.setInvocationContext(invocation.getParentContext());
         try {
-          invocation.triggerFinishedEvent(ar.getStatusCode());
+          invocation.onFinish(ar);
           asyncResp.handle(ar);
         } finally {
           ContextUtils.removeInvocationContext();
@@ -100,9 +97,10 @@ public final class InvokerUtils {
       });
     } catch (Throwable e) {
       //if throw exception,we can use 500 for status code ?
-      invocation.triggerFinishedEvent(500);
+      Response response = Response.createConsumerFail(e);
+      invocation.onFinish(response);
       LOGGER.error("invoke failed, {}", invocation.getOperationMeta().getMicroserviceQualifiedName());
-      asyncResp.consumerFail(e);
+      asyncResp.handle(response);
     }
   }
 
@@ -110,11 +108,4 @@ public final class InvokerUtils {
   public static Object invoke(Invocation invocation) {
     return syncInvoke(invocation);
   }
-
-  private static void triggerStartedEvent(Invocation invocation) {
-    long startTime = System.nanoTime();
-    EventBus.getInstance().triggerEvent(new InvocationStartedEvent(invocation.getMicroserviceQualifiedName(),
-        InvocationType.CONSUMER, startTime));
-    invocation.setStartTime(startTime);
-  }
 }

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.

[incubator-servicecomb-java-chassis] 02/05: SCB-374 [WIP] invocation add publish life event method

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 4ea8e6f34b398bd9513b28afa612dcf791c1c6fa
Author: wujimin <wu...@huawei.com>
AuthorDate: Fri Mar 9 15:25:00 2018 +0800

    SCB-374 [WIP] invocation add publish life event method
---
 .../org/apache/servicecomb/core/Invocation.java    |  49 ++++++--
 .../apache/servicecomb/core/TestInvocation.java    | 138 +++++++++++++++++++++
 2 files changed, 180 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/Invocation.java b/core/src/main/java/org/apache/servicecomb/core/Invocation.java
index d26c903..369f071 100644
--- a/core/src/main/java/org/apache/servicecomb/core/Invocation.java
+++ b/core/src/main/java/org/apache/servicecomb/core/Invocation.java
@@ -24,12 +24,16 @@ import java.util.concurrent.Executor;
 
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationFinishedEvent;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.metrics.InvocationStartExecutionEvent;
+import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
 import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
 import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.InvocationType;
+import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.SwaggerInvocation;
 
 public class Invocation extends SwaggerInvocation {
@@ -53,7 +57,6 @@ public class Invocation extends SwaggerInvocation {
 
   private int handlerIndex;
 
-
   // 应答的处理器
   // 同步模式:避免应答在网络线程中处理解码等等业务级逻辑
   private Executor responseExecutor;
@@ -64,8 +67,12 @@ public class Invocation extends SwaggerInvocation {
 
   private boolean sync = true;
 
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getStartExecutionTime() {
+    return startExecutionTime;
   }
 
   public Invocation(ReferenceConfig referenceConfig, OperationMeta operationMeta, Object[] swaggerArguments) {
@@ -185,7 +192,23 @@ public class Invocation extends SwaggerInvocation {
     return operationMeta.getMicroserviceQualifiedName();
   }
 
-  public void triggerStartExecutionEvent() {
+  public void onStart() {
+    this.startTime = System.nanoTime();
+    EventManager.post(new InvocationStartEvent(this));
+
+    // old logic, need to be deleted
+    EventBus.getInstance().triggerEvent(new InvocationStartedEvent(getMicroserviceQualifiedName(),
+        invocationType, startTime));
+  }
+
+  public void onStartExecute() {
+    this.startExecutionTime = System.nanoTime();
+
+    // old logic, need to be deleted
+    triggerStartExecutionEvent();
+  }
+
+  private void triggerStartExecutionEvent() {
     if (InvocationType.PRODUCER.equals(invocationType)) {
       this.startExecutionTime = System.nanoTime();
       EventBus.getInstance()
@@ -193,10 +216,18 @@ public class Invocation extends SwaggerInvocation {
     }
   }
 
-  public void triggerFinishedEvent(int statusCode) {
+  public void onFinish(Response response) {
+    EventManager.post(new InvocationFinishEvent(this, response));
+
+    // old logic, need to be deleted
+    triggerFinishedEvent(response.getStatusCode());
+  }
+
+  private void triggerFinishedEvent(int statusCode) {
     long finishedTime = System.nanoTime();
     EventBus.getInstance()
-        .triggerEvent(new InvocationFinishedEvent(operationMeta.getMicroserviceQualifiedName(), this.invocationType,
+        .triggerEvent(new org.apache.servicecomb.core.metrics.InvocationFinishedEvent(
+            operationMeta.getMicroserviceQualifiedName(), this.invocationType,
             startExecutionTime - startTime, finishedTime - startExecutionTime,
             finishedTime - startTime, statusCode));
   }
@@ -208,4 +239,8 @@ public class Invocation extends SwaggerInvocation {
   public void setSync(boolean sync) {
     this.sync = sync;
   }
+
+  public boolean isConsumer() {
+    return InvocationType.CONSUMER.equals(invocationType);
+  }
 }
diff --git a/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java b/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java
new file mode 100644
index 0000000..49352ab
--- /dev/null
+++ b/core/src/test/java/org/apache/servicecomb/core/TestInvocation.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core;
+
+import javax.xml.ws.Holder;
+
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
+import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
+import org.apache.servicecomb.foundation.common.event.EventManager;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+public class TestInvocation {
+  Invocation invocation;
+
+  @Mocked
+  Endpoint endpoint;
+
+  @Mocked
+  OperationMeta operationMeta;
+
+  @Mocked
+  Object[] swaggerArguments;
+
+  static long currentNanoTime = 123;
+
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  protected static void mockNonaTime() {
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return currentNanoTime;
+      }
+    };
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @Test
+  public void onStart() {
+    mockNonaTime();
+
+    Holder<Invocation> result = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        result.value = event.getInvocation();
+      }
+    };
+    EventManager.register(subscriber);
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onStart();
+
+    Assert.assertEquals(currentNanoTime, result.value.getStartTime());
+    Assert.assertSame(invocation, result.value);
+
+    EventManager.unregister(subscriber);
+  }
+
+  @Test
+  public void onStartExecute() {
+    mockNonaTime();
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onStartExecute();
+
+    Assert.assertEquals(currentNanoTime, invocation.getStartExecutionTime());
+  }
+
+  @Test
+  public void onFinish(@Mocked Response response) {
+    mockNonaTime();
+
+    Holder<InvocationFinishEvent> result = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationFinishEvent event) {
+        result.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    invocation.onFinish(response);
+
+    Assert.assertEquals(currentNanoTime, result.value.getNanoCurrent());
+    Assert.assertSame(invocation, result.value.getInvocation());
+    Assert.assertSame(response, result.value.getResponse());
+
+    EventManager.unregister(subscriber);
+  }
+
+  @Test
+  public void isConsumer_yes() {
+    Invocation invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
+    Assert.assertFalse(invocation.isConsumer());
+  }
+
+  @Test
+  public void isConsumer_no(@Mocked ReferenceConfig referenceConfig) {
+    Invocation invocation = new Invocation(referenceConfig, operationMeta, swaggerArguments);
+    Assert.assertTrue(invocation.isConsumer());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.

[incubator-servicecomb-java-chassis] 04/05: SCB-374 [WIP] invocation from rest transport publish event

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit aa6886df65704d260a72a9bc25878ff9fb9f29a9
Author: wujimin <wu...@huawei.com>
AuthorDate: Sat Mar 10 11:12:32 2018 +0800

    SCB-374 [WIP] invocation from rest transport publish event
---
 .../common/rest/AbstractRestInvocation.java        | 25 +++----
 .../common/rest/RestProducerInvocation.java        |  5 +-
 .../common/rest/TestAbstractRestInvocation.java    | 76 ++++++++++++++++++----
 .../servicecomb/edge/core/EdgeInvocation.java      |  4 +-
 .../servicecomb/edge/core/TestEdgeInvocation.java  |  3 +-
 5 files changed, 79 insertions(+), 34 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index 7d8ab2c..6e6f876 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -37,12 +37,9 @@ import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.OperationMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
@@ -109,12 +106,10 @@ public abstract class AbstractRestInvocation {
   }
 
   protected void scheduleInvocation() {
+    createInvocation();
+    invocation.onStart();
     OperationMeta operationMeta = restOperationMeta.getOperationMeta();
 
-    InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
-        InvocationType.PRODUCER, System.nanoTime());
-    EventBus.getInstance().triggerEvent(startedEvent);
-
     operationMeta.getExecutor().execute(() -> {
       synchronized (this.requestEx) {
         try {
@@ -127,7 +122,7 @@ public abstract class AbstractRestInvocation {
             return;
           }
 
-          runOnExecutor(startedEvent);
+          runOnExecutor();
         } catch (Throwable e) {
           LOGGER.error("rest server onRequest error", e);
           sendFailResponse(e);
@@ -136,19 +131,16 @@ public abstract class AbstractRestInvocation {
     });
   }
 
-  protected void runOnExecutor(InvocationStartedEvent startedEvent) {
-    createInvocation(null);
-
-    //立刻设置开始时间,否则Finished时无法计算TotalTime
-    invocation.setStartTime(startedEvent.getStartedTime());
-    invocation.triggerStartExecutionEvent();
+  protected void runOnExecutor() {
+    invocation.onStartExecute();
 
     invoke();
   }
 
   protected abstract OperationLocator locateOperation(ServicePathManager servicePathManager);
 
-  protected abstract void createInvocation(Object[] args);
+  // create a invocation without args setted
+  protected abstract void createInvocation();
 
   public void invoke() {
     try {
@@ -184,8 +176,6 @@ public abstract class AbstractRestInvocation {
   protected void doInvoke() throws Throwable {
     invocation.next(resp -> {
       sendResponseQuietly(resp);
-
-      invocation.triggerFinishedEvent(resp.getStatusCode());
     });
   }
 
@@ -207,6 +197,7 @@ public abstract class AbstractRestInvocation {
           e);
     } finally {
       requestEx.getAsyncContext().complete();
+      invocation.onFinish(response);
     }
   }
 
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
index cd5e5f3..2e0cbe9 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocation.java
@@ -50,6 +50,7 @@ public class RestProducerInvocation extends AbstractRestInvocation {
       return;
     }
 
+    createInvocation();
     scheduleInvocation();
   }
 
@@ -70,9 +71,9 @@ public class RestProducerInvocation extends AbstractRestInvocation {
   }
 
   @Override
-  protected void createInvocation(Object[] args) {
+  protected void createInvocation() {
     this.invocation = InvocationFactory.forProvider(transport.getEndpoint(),
         restOperationMeta.getOperationMeta(),
-        args);
+        null);
   }
 }
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index aeac2ec..291c731 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -39,27 +39,33 @@ import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
 import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.utils.JsonUtils;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.foundation.vertx.http.AbstractHttpServletRequest;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.swagger.invocation.response.Headers;
 import org.hamcrest.Matchers;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
 import io.vertx.core.buffer.Buffer;
 import mockit.Deencapsulation;
 import mockit.Expectations;
@@ -104,13 +110,23 @@ public class TestAbstractRestInvocation {
     }
 
     @Override
-    protected void createInvocation(Object[] args) {
+    protected void createInvocation() {
       this.invocation = TestAbstractRestInvocation.this.invocation;
     }
   }
 
   AbstractRestInvocation restInvocation = new AbstractRestInvocationForTest();
 
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
   @Before
   public void setup() {
     invocation = new Invocation(endpoint, operationMeta, swaggerArguments);
@@ -369,6 +385,15 @@ public class TestAbstractRestInvocation {
 
   @Test
   public void sendResponseQuietlyNormal(@Mocked Response response) {
+    Holder<InvocationFinishEvent> eventHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onFinished(InvocationFinishEvent event) {
+        eventHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     Holder<Response> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
@@ -384,6 +409,9 @@ public class TestAbstractRestInvocation {
 
     restInvocation.sendResponseQuietly(response);
 
+    EventManager.unregister(subscriber);
+
+    Assert.assertSame(invocation, eventHolder.value.getInvocation());
     Assert.assertSame(response, result.value);
   }
 
@@ -636,8 +664,6 @@ public class TestAbstractRestInvocation {
         result = operationMeta;
         operationMeta.getExecutor();
         result = executor;
-        operationMeta.getMicroserviceQualifiedName();
-        result = "sayHi";
       }
     };
 
@@ -645,7 +671,7 @@ public class TestAbstractRestInvocation {
     Error error = new Error("run on executor");
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         throw error;
       }
 
@@ -682,7 +708,7 @@ public class TestAbstractRestInvocation {
 
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         throw new Error("run on executor");
       }
 
@@ -700,6 +726,22 @@ public class TestAbstractRestInvocation {
 
   @Test
   public void scheduleInvocationNormal(@Mocked OperationMeta operationMeta) {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+    Holder<InvocationStartEvent> eventHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        eventHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     Executor executor = new ReactiveExecutor();
     requestEx = new AbstractHttpServletRequest() {
     };
@@ -710,15 +752,13 @@ public class TestAbstractRestInvocation {
         result = operationMeta;
         operationMeta.getExecutor();
         result = executor;
-        operationMeta.getMicroserviceQualifiedName();
-        result = "sayHi";
       }
     };
 
     Holder<Boolean> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
-      protected void runOnExecutor(InvocationStartedEvent startedEvent) {
+      protected void runOnExecutor() {
         result.value = true;
       }
     };
@@ -726,12 +766,23 @@ public class TestAbstractRestInvocation {
     restInvocation.restOperationMeta = restOperation;
 
     restInvocation.scheduleInvocation();
+    EventManager.unregister(subscriber);
 
     Assert.assertTrue(result.value);
+    Assert.assertEquals(time, invocation.getStartTime());
+    Assert.assertSame(invocation, eventHolder.value.getInvocation());
   }
 
   @Test
   public void runOnExecutor() {
+    long time = 123;
+    new MockUp<System>() {
+      @Mock
+      long nanoTime() {
+        return time;
+      }
+    };
+
     Holder<Boolean> result = new Holder<>();
     restInvocation = new AbstractRestInvocationForTest() {
       @Override
@@ -739,12 +790,15 @@ public class TestAbstractRestInvocation {
         result.value = true;
       }
     };
+    restInvocation.createInvocation();
     restInvocation.requestEx = requestEx;
     restInvocation.restOperationMeta = restOperation;
 
-    restInvocation.runOnExecutor(new InvocationStartedEvent("", InvocationType.PRODUCER, System.nanoTime()));
+    restInvocation.runOnExecutor();
+
     Assert.assertTrue(result.value);
     Assert.assertSame(invocation, restInvocation.invocation);
+    Assert.assertEquals(time, invocation.getStartExecutionTime());
   }
 
   @Test
diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
index f764d19..e898887 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
@@ -112,7 +112,7 @@ public class EdgeInvocation extends AbstractRestInvocation {
   }
 
   @Override
-  protected void createInvocation(Object[] args) {
+  protected void createInvocation() {
     ReferenceConfig referenceConfig = new ReferenceConfig();
     referenceConfig.setMicroserviceMeta(latestMicroserviceVersionMeta.getMicroserviceMeta());
     referenceConfig.setMicroserviceVersionRule(microserviceVersionRule.getVersionRule().getVersionRule());
@@ -120,7 +120,7 @@ public class EdgeInvocation extends AbstractRestInvocation {
 
     this.invocation = InvocationFactory.forConsumer(referenceConfig,
         restOperationMeta.getOperationMeta(),
-        args);
+        null);
     this.invocation.setResponseExecutor(new ReactiveResponseExecutor());
   }
 }
diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
index 81160b4..5cbadbf 100644
--- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
+++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
@@ -217,7 +217,6 @@ public class TestEdgeInvocation {
     edgeInvocation.microserviceVersionRule = microserviceVersionRule;
     Deencapsulation.setField(edgeInvocation, "restOperationMeta", restOperationMeta);
 
-    Object[] args = new Object[] {};
     new Expectations(RegistryUtils.class) {
       {
         RegistryUtils.getMicroservice();
@@ -225,7 +224,7 @@ public class TestEdgeInvocation {
       }
     };
 
-    edgeInvocation.createInvocation(args);
+    edgeInvocation.createInvocation();
     Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation");
     Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class));
   }

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.

[incubator-servicecomb-java-chassis] 05/05: SCB-374 invocation from highway transport publish event

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 4ccab5518c9553aafaac5c6637b097d7ce0f9176
Author: wujimin <wu...@huawei.com>
AuthorDate: Mon Mar 12 11:15:42 2018 +0800

    SCB-374 invocation from highway transport publish event
---
 .../transport/highway/HighwayCodec.java            | 15 ++------
 .../transport/highway/HighwayServer.java           |  9 +++--
 .../transport/highway/HighwayServerConnection.java |  9 ++++-
 .../transport/highway/HighwayServerInvoke.java     | 39 ++++++++++++---------
 .../transport/highway/HighwayServerVerticle.java   |  2 +-
 .../transport/highway/HighwayTransport.java        |  2 --
 .../transport/highway/TestHighwayCodec.java        | 24 +++++++------
 .../highway/TestHighwayServerConnection.java       |  6 +++-
 .../transport/highway/TestHighwayServerInvoke.java | 40 ++++++++++++++++++++++
 9 files changed, 97 insertions(+), 49 deletions(-)

diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
index 525ad57..fc5a148 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.transport.highway;
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.client.tcp.TcpData;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpOutputStream;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -31,15 +30,9 @@ import io.protostuff.runtime.ProtobufFeature;
 import io.vertx.core.buffer.Buffer;
 
 public final class HighwayCodec {
-  private static HighwayTransport highwayTransport;
-
   private HighwayCodec() {
   }
 
-  public static void setHighwayTransport(HighwayTransport highwayTransport) {
-    HighwayCodec.highwayTransport = highwayTransport;
-  }
-
   public static TcpOutputStream encodeRequest(long msgId, Invocation invocation,
       OperationProtobuf operationProtobuf, ProtobufFeature protobufFeature) throws Exception {
     // 写header
@@ -56,17 +49,13 @@ public final class HighwayCodec {
     return os;
   }
 
-  public static Invocation decodeRequest(RequestHeader header, OperationProtobuf operationProtobuf,
+  public static void decodeRequest(Invocation invocation, RequestHeader header, OperationProtobuf operationProtobuf,
       Buffer bodyBuffer, ProtobufFeature protobufFeature) throws Exception {
     WrapSchema schema = operationProtobuf.getRequestSchema();
     Object[] args = schema.readObject(bodyBuffer, protobufFeature);
 
-    Invocation invocation =
-        InvocationFactory.forProvider(highwayTransport.getEndpoint(),
-            operationProtobuf.getOperationMeta(),
-            args);
+    invocation.setSwaggerArguments(args);
     invocation.setContext(header.getContext());
-    return invocation;
   }
 
   public static RequestHeader readRequestHeader(Buffer headerBuffer,
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
index c3c0c53..5aca854 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
@@ -17,18 +17,21 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
 import org.apache.servicecomb.foundation.vertx.server.TcpServer;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
 
 public class HighwayServer extends TcpServer {
+  private Endpoint endpoint;
 
-  public HighwayServer(URIEndpointObject endpointObject) {
-    super(endpointObject);
+  public HighwayServer(Endpoint endpoint) {
+    super((URIEndpointObject) endpoint.getAddress());
+    this.endpoint = endpoint;
   }
 
   @Override
   protected TcpServerConnection createTcpServerConnection() {
-    return new HighwayServerConnection();
+    return new HighwayServerConnection(endpoint);
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index a8d7452..79b5e02 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.transport.highway;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
 import org.apache.servicecomb.foundation.vertx.server.TcpParser;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,8 +36,14 @@ import io.vertx.core.net.NetSocket;
 public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
 
+  private Endpoint endpoint;
+
   private ProtobufFeature protobufFeature = new ProtobufFeature();
 
+  public HighwayServerConnection(Endpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+
   @Override
   public void init(NetSocket netSocket) {
     splitter = new TcpParser(this);
@@ -115,7 +122,7 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
   }
 
   protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
-    HighwayServerInvoke invoke = new HighwayServerInvoke(protobufFeature);
+    HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint, protobufFeature);
     if (invoke.init(this, msgId, header, bodyBuffer)) {
       invoke.execute();
     }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 0450187..3dd6006 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -24,15 +24,14 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
@@ -62,11 +61,16 @@ public class HighwayServerInvoke {
 
   private Buffer bodyBuffer;
 
+  private Endpoint endpoint;
+
+  Invocation invocation;
+
   public HighwayServerInvoke() {
-    this(null);
+    this(null, null);
   }
 
-  public HighwayServerInvoke(ProtobufFeature protobufFeature) {
+  public HighwayServerInvoke(Endpoint endpoint, ProtobufFeature protobufFeature) {
+    this.endpoint = endpoint;
     this.protobufFeature = protobufFeature;
   }
 
@@ -107,9 +111,9 @@ public class HighwayServerInvoke {
     this.bodyBuffer = bodyBuffer;
   }
 
-  private void runInExecutor(InvocationStartedEvent startedEvent) {
+  private void runInExecutor() {
     try {
-      doRunInExecutor(startedEvent);
+      doRunInExecutor();
     } catch (Throwable e) {
       String msg = String.format("handle request error, %s, msgId=%d",
           operationMeta.getMicroserviceQualifiedName(),
@@ -120,16 +124,14 @@ public class HighwayServerInvoke {
     }
   }
 
-  private void doRunInExecutor(InvocationStartedEvent startedEvent) throws Exception {
-    Invocation invocation = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, protobufFeature);
+  private void doRunInExecutor() throws Exception {
+    invocation.onStartExecute();
+
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, protobufFeature);
     invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, this.connection.getNetSocket().remoteAddress());
-    //立刻设置开始时间,否则Finished时无法计算TotalTime
-    invocation.setStartTime(startedEvent.getStartedTime());
-    invocation.triggerStartExecutionEvent();
 
     invocation.next(response -> {
       sendResponse(invocation.getContext(), response);
-      invocation.triggerFinishedEvent(response.getStatusCode());
     });
   }
 
@@ -155,6 +157,8 @@ public class HighwayServerInvoke {
           operationProtobuf.getOperationMeta().getMicroserviceQualifiedName(),
           msgId);
       LOGGER.error(msg, e);
+    } finally {
+      invocation.onFinish(response);
     }
   }
 
@@ -162,9 +166,10 @@ public class HighwayServerInvoke {
    * start time in queue.
    */
   public void execute() {
-    InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
-        InvocationType.PRODUCER, System.nanoTime());
-    EventBus.getInstance().triggerEvent(startedEvent);
-    operationMeta.getExecutor().execute(() -> runInExecutor(startedEvent));
+    invocation = InvocationFactory.forProvider(endpoint,
+        operationProtobuf.getOperationMeta(),
+        null);
+    invocation.onStart();
+    operationMeta.getExecutor().execute(() -> runInExecutor());
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
index 9c892b8..123f5c7 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
@@ -66,7 +66,7 @@ public class HighwayServerVerticle extends AbstractVerticle {
       return;
     }
 
-    HighwayServer server = new HighwayServer(endpointObject);
+    HighwayServer server = new HighwayServer(endpoint);
     server.init(vertx, SSL_KEY, ar -> {
       if (ar.succeeded()) {
         InetSocketAddress socketAddress = ar.result();
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
index 8f197aa..195741f 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
@@ -43,8 +43,6 @@ public class HighwayTransport extends AbstractTransport {
   public boolean init() throws Exception {
     highwayClient.init(transportVertx);
 
-    HighwayCodec.setHighwayTransport(this);
-
     DeploymentOptions deployOptions = new DeploymentOptions().setInstances(HighwayConfig.getServerThreadCount());
     setListenAddressWithoutSchema(HighwayConfig.getAddress(), Collections.singletonMap(TcpConst.LOGIN, "true"));
     SimpleJsonObject json = new SimpleJsonObject();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index d387daf..1d7cd09 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -19,12 +19,14 @@ package org.apache.servicecomb.transport.highway;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.codec.protobuf.utils.schema.NotWrapSchema;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
@@ -51,6 +53,7 @@ import io.protostuff.runtime.ProtobufFeature;
 import io.vertx.core.buffer.Buffer;
 import mockit.Mock;
 import mockit.MockUp;
+import mockit.Mocked;
 
 public class TestHighwayCodec {
 
@@ -75,7 +78,6 @@ public class TestHighwayCodec {
   @BeforeClass
   public static void setupClass() {
     ProtobufCompatibleUtils.init();
-    HighwayCodec.setHighwayTransport(new HighwayTransport());
   }
 
   @Before
@@ -126,17 +128,17 @@ public class TestHighwayCodec {
   }
 
   @Test
-  public void testDecodeRequest() {
-    boolean status = true;
+  public void testDecodeRequest(@Mocked Endpoint endpoint) throws Exception {
+    commonMock();
+    Mockito.when(schemaMeta.getProviderHandlerChain()).thenReturn(Collections.emptyList());
+    Object[] args = new Object[] {};
+    Mockito.when(schema.readObject(bodyBuffer, null)).thenReturn(args);
+    
+    Invocation invocation = new Invocation(endpoint, operationMeta, null);
 
-    try {
-      commonMock();
-      Invocation inv = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, null);
-      Assert.assertNotNull(inv);
-    } catch (Exception e) {
-      status = false;
-    }
-    Assert.assertTrue(status);
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, null);
+
+    Assert.assertSame(args, invocation.getSwaggerArguments());
   }
 
   @Test
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index 8f6e2d4..e823e4a 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
 import org.apache.servicecomb.codec.protobuf.utils.ProtobufSchemaUtils;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -59,6 +60,9 @@ public class TestHighwayServerConnection {
   MicroserviceMetaManager microserviceMetaManager;
 
   @Mocked
+  Endpoint endpoint;
+
+  @Mocked
   NetSocketImpl netSocket;
 
   RequestHeader header = new RequestHeader();
@@ -71,7 +75,7 @@ public class TestHighwayServerConnection {
         result = new SocketAddressImpl(new InetSocketAddress("127.0.0.1", 80));
       }
     };
-    connection = new HighwayServerConnection();
+    connection = new HighwayServerConnection(endpoint);
     connection.init(netSocket);
 
     header = new RequestHeader();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index fb9b3b9..0dc902c 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,17 +17,27 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import javax.xml.ws.Holder;
+
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
 import org.apache.servicecomb.core.unittest.UnitTestMeta;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
 import org.apache.servicecomb.transport.common.MockUtil;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
 import io.netty.buffer.ByteBuf;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetSocket;
@@ -52,6 +62,16 @@ public class TestHighwayServerInvoke {
 
   private SocketAddress socketAddress;
 
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
   @Before
   public void setup() {
     unitTestMeta = new UnitTestMeta();
@@ -87,6 +107,21 @@ public class TestHighwayServerInvoke {
 
   @Test
   public void test() {
+    Holder<InvocationStartEvent> startHolder = new Holder<>();
+    Holder<InvocationFinishEvent> finishHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        startHolder.value = event;
+      }
+
+      @Subscribe
+      public void onFinish(InvocationFinishEvent event) {
+        finishHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     MockUtil.getInstance().mockHighwayCodec();
 
     SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
@@ -111,6 +146,11 @@ public class TestHighwayServerInvoke {
     // exe失败
     MockUtil.getInstance().decodeRequestSucc = false;
     highwayServerInvoke.execute();
+    EventManager.unregister(subscriber);
+
     Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+    Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+    Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+    Assert.assertTrue(highwayServerInvoke.invocation.getStartExecutionTime() != 0);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.