You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/03/15 11:12:21 UTC
[incubator-servicecomb-java-chassis] 05/05: SCB-374 invocation from
highway transport publish event
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 4ccab5518c9553aafaac5c6637b097d7ce0f9176
Author: wujimin <wu...@huawei.com>
AuthorDate: Mon Mar 12 11:15:42 2018 +0800
SCB-374 invocation from highway transport publish event
---
.../transport/highway/HighwayCodec.java | 15 ++------
.../transport/highway/HighwayServer.java | 9 +++--
.../transport/highway/HighwayServerConnection.java | 9 ++++-
.../transport/highway/HighwayServerInvoke.java | 39 ++++++++++++---------
.../transport/highway/HighwayServerVerticle.java | 2 +-
.../transport/highway/HighwayTransport.java | 2 --
.../transport/highway/TestHighwayCodec.java | 24 +++++++------
.../highway/TestHighwayServerConnection.java | 6 +++-
.../transport/highway/TestHighwayServerInvoke.java | 40 ++++++++++++++++++++++
9 files changed, 97 insertions(+), 49 deletions(-)
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
index 525ad57..fc5a148 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayCodec.java
@@ -20,7 +20,6 @@ package org.apache.servicecomb.transport.highway;
import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.core.invocation.InvocationFactory;
import org.apache.servicecomb.foundation.vertx.client.tcp.TcpData;
import org.apache.servicecomb.foundation.vertx.tcp.TcpOutputStream;
import org.apache.servicecomb.swagger.invocation.Response;
@@ -31,15 +30,9 @@ import io.protostuff.runtime.ProtobufFeature;
import io.vertx.core.buffer.Buffer;
public final class HighwayCodec {
- private static HighwayTransport highwayTransport;
-
private HighwayCodec() {
}
- public static void setHighwayTransport(HighwayTransport highwayTransport) {
- HighwayCodec.highwayTransport = highwayTransport;
- }
-
public static TcpOutputStream encodeRequest(long msgId, Invocation invocation,
OperationProtobuf operationProtobuf, ProtobufFeature protobufFeature) throws Exception {
// 写header
@@ -56,17 +49,13 @@ public final class HighwayCodec {
return os;
}
- public static Invocation decodeRequest(RequestHeader header, OperationProtobuf operationProtobuf,
+ public static void decodeRequest(Invocation invocation, RequestHeader header, OperationProtobuf operationProtobuf,
Buffer bodyBuffer, ProtobufFeature protobufFeature) throws Exception {
WrapSchema schema = operationProtobuf.getRequestSchema();
Object[] args = schema.readObject(bodyBuffer, protobufFeature);
- Invocation invocation =
- InvocationFactory.forProvider(highwayTransport.getEndpoint(),
- operationProtobuf.getOperationMeta(),
- args);
+ invocation.setSwaggerArguments(args);
invocation.setContext(header.getContext());
- return invocation;
}
public static RequestHeader readRequestHeader(Buffer headerBuffer,
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
index c3c0c53..5aca854 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java
@@ -17,18 +17,21 @@
package org.apache.servicecomb.transport.highway;
+import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
import org.apache.servicecomb.foundation.vertx.server.TcpServer;
import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
public class HighwayServer extends TcpServer {
+ private Endpoint endpoint;
- public HighwayServer(URIEndpointObject endpointObject) {
- super(endpointObject);
+ public HighwayServer(Endpoint endpoint) {
+ super((URIEndpointObject) endpoint.getAddress());
+ this.endpoint = endpoint;
}
@Override
protected TcpServerConnection createTcpServerConnection() {
- return new HighwayServerConnection();
+ return new HighwayServerConnection(endpoint);
}
}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index a8d7452..79b5e02 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,6 +18,7 @@ package org.apache.servicecomb.transport.highway;
import javax.ws.rs.core.Response.Status;
+import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
import org.apache.servicecomb.foundation.vertx.server.TcpParser;
import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,8 +36,14 @@ import io.vertx.core.net.NetSocket;
public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
+ private Endpoint endpoint;
+
private ProtobufFeature protobufFeature = new ProtobufFeature();
+ public HighwayServerConnection(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
@Override
public void init(NetSocket netSocket) {
splitter = new TcpParser(this);
@@ -115,7 +122,7 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
}
protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
- HighwayServerInvoke invoke = new HighwayServerInvoke(protobufFeature);
+ HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint, protobufFeature);
if (invoke.init(this, msgId, header, bodyBuffer)) {
invoke.execute();
}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 0450187..3dd6006 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -24,15 +24,14 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
-import org.apache.servicecomb.core.metrics.InvocationStartedEvent;
-import org.apache.servicecomb.foundation.common.event.EventBus;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
-import org.apache.servicecomb.swagger.invocation.InvocationType;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
@@ -62,11 +61,16 @@ public class HighwayServerInvoke {
private Buffer bodyBuffer;
+ private Endpoint endpoint;
+
+ Invocation invocation;
+
public HighwayServerInvoke() {
- this(null);
+ this(null, null);
}
- public HighwayServerInvoke(ProtobufFeature protobufFeature) {
+ public HighwayServerInvoke(Endpoint endpoint, ProtobufFeature protobufFeature) {
+ this.endpoint = endpoint;
this.protobufFeature = protobufFeature;
}
@@ -107,9 +111,9 @@ public class HighwayServerInvoke {
this.bodyBuffer = bodyBuffer;
}
- private void runInExecutor(InvocationStartedEvent startedEvent) {
+ private void runInExecutor() {
try {
- doRunInExecutor(startedEvent);
+ doRunInExecutor();
} catch (Throwable e) {
String msg = String.format("handle request error, %s, msgId=%d",
operationMeta.getMicroserviceQualifiedName(),
@@ -120,16 +124,14 @@ public class HighwayServerInvoke {
}
}
- private void doRunInExecutor(InvocationStartedEvent startedEvent) throws Exception {
- Invocation invocation = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, protobufFeature);
+ private void doRunInExecutor() throws Exception {
+ invocation.onStartExecute();
+
+ HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, protobufFeature);
invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, this.connection.getNetSocket().remoteAddress());
- //立刻设置开始时间,否则Finished时无法计算TotalTime
- invocation.setStartTime(startedEvent.getStartedTime());
- invocation.triggerStartExecutionEvent();
invocation.next(response -> {
sendResponse(invocation.getContext(), response);
- invocation.triggerFinishedEvent(response.getStatusCode());
});
}
@@ -155,6 +157,8 @@ public class HighwayServerInvoke {
operationProtobuf.getOperationMeta().getMicroserviceQualifiedName(),
msgId);
LOGGER.error(msg, e);
+ } finally {
+ invocation.onFinish(response);
}
}
@@ -162,9 +166,10 @@ public class HighwayServerInvoke {
* start time in queue.
*/
public void execute() {
- InvocationStartedEvent startedEvent = new InvocationStartedEvent(operationMeta.getMicroserviceQualifiedName(),
- InvocationType.PRODUCER, System.nanoTime());
- EventBus.getInstance().triggerEvent(startedEvent);
- operationMeta.getExecutor().execute(() -> runInExecutor(startedEvent));
+ invocation = InvocationFactory.forProvider(endpoint,
+ operationProtobuf.getOperationMeta(),
+ null);
+ invocation.onStart();
+ operationMeta.getExecutor().execute(() -> runInExecutor());
}
}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
index 9c892b8..123f5c7 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java
@@ -66,7 +66,7 @@ public class HighwayServerVerticle extends AbstractVerticle {
return;
}
- HighwayServer server = new HighwayServer(endpointObject);
+ HighwayServer server = new HighwayServer(endpoint);
server.init(vertx, SSL_KEY, ar -> {
if (ar.succeeded()) {
InetSocketAddress socketAddress = ar.result();
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
index 8f197aa..195741f 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayTransport.java
@@ -43,8 +43,6 @@ public class HighwayTransport extends AbstractTransport {
public boolean init() throws Exception {
highwayClient.init(transportVertx);
- HighwayCodec.setHighwayTransport(this);
-
DeploymentOptions deployOptions = new DeploymentOptions().setInstances(HighwayConfig.getServerThreadCount());
setListenAddressWithoutSchema(HighwayConfig.getAddress(), Collections.singletonMap(TcpConst.LOGIN, "true"));
SimpleJsonObject json = new SimpleJsonObject();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
index d387daf..1d7cd09 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayCodec.java
@@ -19,12 +19,14 @@ package org.apache.servicecomb.transport.highway;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.codec.protobuf.utils.schema.NotWrapSchema;
+import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
@@ -51,6 +53,7 @@ import io.protostuff.runtime.ProtobufFeature;
import io.vertx.core.buffer.Buffer;
import mockit.Mock;
import mockit.MockUp;
+import mockit.Mocked;
public class TestHighwayCodec {
@@ -75,7 +78,6 @@ public class TestHighwayCodec {
@BeforeClass
public static void setupClass() {
ProtobufCompatibleUtils.init();
- HighwayCodec.setHighwayTransport(new HighwayTransport());
}
@Before
@@ -126,17 +128,17 @@ public class TestHighwayCodec {
}
@Test
- public void testDecodeRequest() {
- boolean status = true;
+ public void testDecodeRequest(@Mocked Endpoint endpoint) throws Exception {
+ commonMock();
+ Mockito.when(schemaMeta.getProviderHandlerChain()).thenReturn(Collections.emptyList());
+ Object[] args = new Object[] {};
+ Mockito.when(schema.readObject(bodyBuffer, null)).thenReturn(args);
+
+ Invocation invocation = new Invocation(endpoint, operationMeta, null);
- try {
- commonMock();
- Invocation inv = HighwayCodec.decodeRequest(header, operationProtobuf, bodyBuffer, null);
- Assert.assertNotNull(inv);
- } catch (Exception e) {
- status = false;
- }
- Assert.assertTrue(status);
+ HighwayCodec.decodeRequest(invocation, header, operationProtobuf, bodyBuffer, null);
+
+ Assert.assertSame(args, invocation.getSwaggerArguments());
}
@Test
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
index 8f6e2d4..e823e4a 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java
@@ -24,6 +24,7 @@ import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
import org.apache.servicecomb.codec.protobuf.utils.ProtobufSchemaUtils;
import org.apache.servicecomb.codec.protobuf.utils.WrapSchema;
import org.apache.servicecomb.core.CseContext;
+import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.MicroserviceMetaManager;
import org.apache.servicecomb.core.definition.OperationMeta;
@@ -59,6 +60,9 @@ public class TestHighwayServerConnection {
MicroserviceMetaManager microserviceMetaManager;
@Mocked
+ Endpoint endpoint;
+
+ @Mocked
NetSocketImpl netSocket;
RequestHeader header = new RequestHeader();
@@ -71,7 +75,7 @@ public class TestHighwayServerConnection {
result = new SocketAddressImpl(new InetSocketAddress("127.0.0.1", 80));
}
};
- connection = new HighwayServerConnection();
+ connection = new HighwayServerConnection(endpoint);
connection.init(netSocket);
header = new RequestHeader();
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
index fb9b3b9..0dc902c 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerInvoke.java
@@ -17,17 +17,27 @@
package org.apache.servicecomb.transport.highway;
+import javax.xml.ws.Holder;
+
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.event.InvocationFinishEvent;
+import org.apache.servicecomb.core.event.InvocationStartEvent;
import org.apache.servicecomb.core.executor.ReactiveExecutor;
import org.apache.servicecomb.core.unittest.UnitTestMeta;
+import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
import org.apache.servicecomb.transport.common.MockUtil;
import org.apache.servicecomb.transport.highway.message.RequestHeader;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
import io.netty.buffer.ByteBuf;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
@@ -52,6 +62,16 @@ public class TestHighwayServerInvoke {
private SocketAddress socketAddress;
+ @BeforeClass
+ public static void classSetup() {
+ EventManager.eventBus = new EventBus();
+ }
+
+ @AfterClass
+ public static void classTeardown() {
+ EventManager.eventBus = new EventBus();
+ }
+
@Before
public void setup() {
unitTestMeta = new UnitTestMeta();
@@ -87,6 +107,21 @@ public class TestHighwayServerInvoke {
@Test
public void test() {
+ Holder<InvocationStartEvent> startHolder = new Holder<>();
+ Holder<InvocationFinishEvent> finishHolder = new Holder<>();
+ Object subscriber = new Object() {
+ @Subscribe
+ public void onStart(InvocationStartEvent event) {
+ startHolder.value = event;
+ }
+
+ @Subscribe
+ public void onFinish(InvocationFinishEvent event) {
+ finishHolder.value = event;
+ }
+ };
+ EventManager.register(subscriber);
+
MockUtil.getInstance().mockHighwayCodec();
SchemaMeta schemaMeta = unitTestMeta.getOrCreateSchemaMeta(Impl.class);
@@ -111,6 +146,11 @@ public class TestHighwayServerInvoke {
// exe失败
MockUtil.getInstance().decodeRequestSucc = false;
highwayServerInvoke.execute();
+ EventManager.unregister(subscriber);
+
Assert.assertEquals(true, Buffer.buffer(netSocketBuffer).toString().startsWith("CSE.TCP"));
+ Assert.assertSame(highwayServerInvoke.invocation, startHolder.value.getInvocation());
+ Assert.assertSame(highwayServerInvoke.invocation, finishHolder.value.getInvocation());
+ Assert.assertTrue(highwayServerInvoke.invocation.getStartExecutionTime() != 0);
}
}
--
To stop receiving notification emails like this one, please contact
liubao@apache.org.