You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/03/15 11:12:21 UTC

[incubator-servicecomb-java-chassis] 05/05: SCB-374 invocation from highway transport publish event

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 4ccab5518c9553aafaac5c6637b097d7ce0f9176
Author: wujimin <wu...@huawei.com>
AuthorDate: Mon Mar 12 11:15:42 2018 +0800

    SCB-374 invocation from highway transport publish event
---
 .../transport/highway/HighwayCodec.java            | 15 ++------
 .../transport/highway/HighwayServer.java           |  9 +++--
 .../transport/highway/HighwayServerConnection.java |  9 ++++-
 .../transport/highway/HighwayServerInvoke.java     | 39 ++++++++++++---------
 .../transport/highway/HighwayServerVerticle.java   |  2 +-
 .../transport/highway/HighwayTransport.java        |  2 --
 .../transport/highway/TestHighwayCodec.java        | 24 +++++++------
 .../highway/TestHighwayServerConnection.java       |  6 +++-
 .../transport/highway/TestHighwayServerInvoke.java | 40 ++++++++++++++++++++++
 9 files changed, 97 insertions(+), 49 deletions(-)

diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
index 525ad57..fc5a148 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.transport.highway;
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.client.tcp.TcpData;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpOutputStream;
 import org.apache.servicecomb.swagger.invocation.Response;
@@ -31,15 +30,9 @@ import io.protostuff.runtime.ProtobufFeature;
 import io.vertx.core.buffer.Buffer;
 
 public final class HighwayCodec {
-  private static HighwayTransport highwayTransport;
-
   private HighwayCodec() {
   }
 
-  public static void setHighwayTransport(HighwayTransport highwayTransport) {
-    HighwayCodec.highwayTransport = highwayTransport;
-  }
-
   public static TcpOutputStream encodeRequest(long msgId, Invocation invocation,
       OperationProtobuf operationProtobuf, ProtobufFeature protobufFeature) throws Exception {
     // 写header
@@ -56,17 +49,13 @@ public final class HighwayCodec {
     return os;
   }
 
-  public static Invocation decodeRequest(RequestHeader header, OperationProtobuf operationProtobuf,
+  public static void decodeRequest(Invocation invocation, RequestHeader header, OperationProtobuf operationProtobuf,
       Buffer bodyBuffer, ProtobufFeature protobufFeature) throws Exception {
     WrapSchema schema = operationProtobuf.getRequestSchema();
     Object[] args = schema.readObject(bodyBuffer, protobufFeature);
 
-    Invocation invocation =
-        InvocationFactory.forProvider(highwayTransport.getEndpoint(),
-            operationProtobuf.getOperationMeta(),
-            args);
+    invocation.setSwaggerArguments(args);
     invocation.setContext(header.getContext());
-    return invocation;
   }
 
   public static RequestHeader readRequestHeader(Buffer headerBuffer,
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
index c3c0c53..5aca854 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
@@ -17,18 +17,21 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
 import org.apache.servicecomb.foundation.vertx.server.TcpServer;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
 
 public class HighwayServer extends TcpServer {
+  private Endpoint endpoint;
 
-  public HighwayServer(URIEndpointObject endpointObject) {
-    super(endpointObject);
+  public HighwayServer(Endpoint endpoint) {
+    super((URIEndpointObject) endpoint.getAddress());
+    this.endpoint = endpoint;
   }
 
   @Override
   protected TcpServerConnection createTcpServerConnection() {
-    return new HighwayServerConnection();
+    return new HighwayServerConnection(endpoint);
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index a8d7452..79b5e02 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.transport.highway;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
 import org.apache.servicecomb.foundation.vertx.server.TcpParser;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,8 +36,14 @@ import io.vertx.core.net.NetSocket;
 public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
 
+  private Endpoint endpoint;
+
   private ProtobufFeature protobufFeature = new ProtobufFeature();
 
+  public HighwayServerConnection(Endpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+
   @Override
   public void init(NetSocket netSocket) {
     splitter = new TcpParser(this);
@@ -115,7 +122,7 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
   }
 
   protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
-    HighwayServerInvoke invoke = new HighwayServerInvoke(protobufFeature);
+    HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint, protobufFeature);
     if (invoke.init(this, msgId, header, bodyBuffer)) {
       invoke.execute();
     }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 0450187..3dd6006 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -24,15 +24,14 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
@@ -62,11 +61,16 @@ public class HighwayServerInvoke {
 
   private Buffer bodyBuffer;
 
+  private Endpoint endpoint;
+
+  Invocation invocation;
+
   public HighwayServerInvoke() {
-    this(null);
+    this(null, null);
   }
 
-  public HighwayServerInvoke(ProtobufFeature protobufFeature) {
+  public HighwayServerInvoke(Endpoint endpoint, ProtobufFeature protobufFeature) {
+    this.endpoint = endpoint;
     this.protobufFeature = protobufFeature;
   }
 
@@ -107,9 +111,9 @@ public class HighwayServerInvoke {
     this.bodyBuffer = bodyBuffer;
   }
 
-  private void runInExecutor(InvocationStartedEvent startedEvent) {
+  private void runInExecutor() {
     try {
-      doRunInExecutor(startedEvent);
+      doRunInExecutor();
     } catch (Throwable e) {
       String msg = String.format("handle request error, %s, msgId=%d",
           operationMeta.getMicroserviceQualifiedName(),
@@ -120,16 +124,14 @@ public class HighwayServerInvoke {
     }
   }
 
-  private void doRunInExecutor(InvocationStartedEvent startedEvent) throws Exception {
-    Invocation invocation = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, protobufFeature);
+  private void doRunInExecutor() throws Exception {
+    invocation.onStartExecute();
+
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, protobufFeature);
     invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, this.connection.getNetSocket().remoteAddress());
-    //立刻设置开始时间,否则Finished时无法计算TotalTime
-    invocation.setStartTime(startedEvent.getStartedTime());
-    invocation.triggerStartExecutionEvent();
 
     invocation.next(response -> {
       sendResponse(invocation.getContext(), response);
-      invocation.triggerFinishedEvent(response.getStatusCode());
     });
   }
 
@@ -155,6 +157,8 @@ public class HighwayServerInvoke {
           operationProtobuf.getOperationMeta().getMicroserviceQualifiedName(),
           msgId);
       LOGGER.error(msg, e);
+    } finally {
+      invocation.onFinish(response);
     }
   }
 
@@ -162,9 +166,10 @@ public class HighwayServerInvoke {
    * start time in queue.
    */
   public void execute() {
-    InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
-        InvocationType.PRODUCER, System.nanoTime());
-    EventBus.getInstance().triggerEvent(startedEvent);
-    operationMeta.getExecutor().execute(() -> runInExecutor(startedEvent));
+    invocation = InvocationFactory.forProvider(endpoint,
+        operationProtobuf.getOperationMeta(),
+        null);
+    invocation.onStart();
+    operationMeta.getExecutor().execute(() -> runInExecutor());
   }
 }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
index 9c892b8..123f5c7 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
@@ -66,7 +66,7 @@ public class HighwayServerVerticle extends AbstractVerticle {
       return;
     }
 
-    HighwayServer server = new HighwayServer(endpointObject);
+    HighwayServer server = new HighwayServer(endpoint);
     server.init(vertx, SSL_KEY, ar -> {
       if (ar.succeeded()) {
         InetSocketAddress socketAddress = ar.result();
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
index 8f197aa..195741f 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
@@ -43,8 +43,6 @@ public class HighwayTransport extends AbstractTransport {
   public boolean init() throws Exception {
     highwayClient.init(transportVertx);
 
-    HighwayCodec.setHighwayTransport(this);
-
     DeploymentOptions deployOptions = new DeploymentOptions().setInstances(HighwayConfig.getServerThreadCount());
     setListenAddressWithoutSchema(HighwayConfig.getAddress(), Collections.singletonMap(TcpConst.LOGIN, "true"));
     SimpleJsonObject json = new SimpleJsonObject();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index d387daf..1d7cd09 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -19,12 +19,14 @@ package org.apache.servicecomb.transport.highway;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.codec.protobuf.utils.schema.NotWrapSchema;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
@@ -51,6 +53,7 @@ import io.protostuff.runtime.ProtobufFeature;
 import io.vertx.core.buffer.Buffer;
 import mockit.Mock;
 import mockit.MockUp;
+import mockit.Mocked;
 
 public class TestHighwayCodec {
 
@@ -75,7 +78,6 @@ public class TestHighwayCodec {
   @BeforeClass
   public static void setupClass() {
     ProtobufCompatibleUtils.init();
-    HighwayCodec.setHighwayTransport(new HighwayTransport());
   }
 
   @Before
@@ -126,17 +128,17 @@ public class TestHighwayCodec {
   }
 
   @Test
-  public void testDecodeRequest() {
-    boolean status = true;
+  public void testDecodeRequest(@Mocked Endpoint endpoint) throws Exception {
+    commonMock();
+    Mockito.when(schemaMeta.getProviderHandlerChain()).thenReturn(Collections.emptyList());
+    Object[] args = new Object[] {};
+    Mockito.when(schema.readObject(bodyBuffer, null)).thenReturn(args);
+    
+    Invocation invocation = new Invocation(endpoint, operationMeta, null);
 
-    try {
-      commonMock();
-      Invocation inv = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, null);
-      Assert.assertNotNull(inv);
-    } catch (Exception e) {
-      status = false;
-    }
-    Assert.assertTrue(status);
+    HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, null);
+
+    Assert.assertSame(args, invocation.getSwaggerArguments());
   }
 
   @Test
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index 8f6e2d4..e823e4a 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
 import org.apache.servicecomb.codec.protobuf.utils.ProtobufSchemaUtils;
 import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
 import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
 import org.apache.servicecomb.core.definition.MicroserviceMeta;
 import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
 import org.apache.servicecomb.core.definition.OperationMeta;
@@ -59,6 +60,9 @@ public class TestHighwayServerConnection {
   MicroserviceMetaManager microserviceMetaManager;
 
   @Mocked
+  Endpoint endpoint;
+
+  @Mocked
   NetSocketImpl netSocket;
 
   RequestHeader header = new RequestHeader();
@@ -71,7 +75,7 @@ public class TestHighwayServerConnection {
         result = new SocketAddressImpl(new InetSocketAddress("127.0.0.1", 80));
       }
     };
-    connection = new HighwayServerConnection();
+    connection = new HighwayServerConnection(endpoint);
     connection.init(netSocket);
 
     header = new RequestHeader();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index fb9b3b9..0dc902c 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,17 +17,27 @@
 
 package org.apache.servicecomb.transport.highway;
 
+import javax.xml.ws.Holder;
+
 import org.apache.servicecomb.core.definition.OperationMeta;
 import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
 import org.apache.servicecomb.core.executor.ReactiveExecutor;
 import org.apache.servicecomb.core.unittest.UnitTestMeta;
+import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
 import org.apache.servicecomb.transport.common.MockUtil;
 import org.apache.servicecomb.transport.highway.message.RequestHeader;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
 import io.netty.buffer.ByteBuf;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.net.NetSocket;
@@ -52,6 +62,16 @@ public class TestHighwayServerInvoke {
 
   private SocketAddress socketAddress;
 
+  @BeforeClass
+  public static void classSetup() {
+    EventManager.eventBus = new EventBus();
+  }
+
+  @AfterClass
+  public static void classTeardown() {
+    EventManager.eventBus = new EventBus();
+  }
+
   @Before
   public void setup() {
     unitTestMeta = new UnitTestMeta();
@@ -87,6 +107,21 @@ public class TestHighwayServerInvoke {
 
   @Test
   public void test() {
+    Holder<InvocationStartEvent> startHolder = new Holder<>();
+    Holder<InvocationFinishEvent> finishHolder = new Holder<>();
+    Object subscriber = new Object() {
+      @Subscribe
+      public void onStart(InvocationStartEvent event) {
+        startHolder.value = event;
+      }
+
+      @Subscribe
+      public void onFinish(InvocationFinishEvent event) {
+        finishHolder.value = event;
+      }
+    };
+    EventManager.register(subscriber);
+
     MockUtil.getInstance().mockHighwayCodec();
 
     SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
@@ -111,6 +146,11 @@ public class TestHighwayServerInvoke {
     // exe失败
     MockUtil.getInstance().decodeRequestSucc = false;
     highwayServerInvoke.execute();
+    EventManager.unregister(subscriber);
+
     Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+    Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+    Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+    Assert.assertTrue(highwayServerInvoke.invocation.getStartExecutionTime() != 0);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.