You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/04 11:45:26 UTC
[servicecomb-java-chassis] branch master updated: [SCB-1985] allow
producer invocation process by filter chain
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/master by this push:
new 9bb31d5 [SCB-1985] allow producer invocation process by filter chain
9bb31d5 is described below
commit 9bb31d54c14060fc5e703cbc4ee7c08c73ed82ff
Author: wujimin <wu...@huawei.com>
AuthorDate: Thu Jun 4 15:51:38 2020 +0800
[SCB-1985] allow producer invocation process by filter chain
---
.../common/rest/RestProducerInvocationFlow.java | 87 ++++++++++++++++++++++
.../highway/HighwayProducerInvocationFlow.java | 51 +++++++++++++
.../transport/highway/HighwayServerConnection.java | 46 ++++++++++--
.../rest/servlet/ServletRestDispatcher.java | 15 ++++
.../transport/rest/vertx/VertxRestDispatcher.java | 16 ++++
5 files changed, 209 insertions(+), 6 deletions(-)
diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
new file mode 100644
index 0000000..f56929e
--- /dev/null
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.common.rest;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType;
+
+import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
+import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
+import org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.ProducerInvocationFlow;
+import org.apache.servicecomb.foundation.common.utils.PartUtils;
+import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
+import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestProducerInvocationFlow extends ProducerInvocationFlow {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RestProducerInvocationFlow.class);
+
+ private static final ProduceProcessor DEFAULT_PRODUCE_PROCESSOR = ProduceProcessorManager.INSTANCE
+ .findDefaultProcessor();
+
+ public RestProducerInvocationFlow(InvocationCreator invocationCreator,
+ HttpServletRequestEx requestEx, HttpServletResponseEx responseEx) {
+ super(invocationCreator, requestEx, responseEx);
+ }
+
+ @Override
+ protected Invocation sendCreateInvocationException(Throwable throwable) {
+ try {
+ Response response = Exceptions.exceptionToResponse(null, throwable, INTERNAL_SERVER_ERROR);
+ RestServerCodecFilter.encodeResponse(response, false, DEFAULT_PRODUCE_PROCESSOR, responseEx);
+ } catch (Throwable e) {
+ LOGGER.error("Failed to send response when prepare invocation failed, request uri:{}",
+ requestEx.getRequestURI(), e);
+ }
+
+ flushResponse("UNKNOWN_OPERATION");
+ return null;
+ }
+
+ @Override
+ protected void sendResponse(Invocation invocation, Response response) {
+ if (isDownloadFileResponseType(invocation, response)) {
+ responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
+ .whenComplete((r, e) -> flushResponse(invocation.getMicroserviceQualifiedName()));
+ return;
+ }
+
+ flushResponse(invocation.getMicroserviceQualifiedName());
+ }
+
+ private void flushResponse(String operationName) {
+ try {
+ responseEx.flushBuffer();
+ } catch (Throwable flushException) {
+ LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
+ operationName, requestEx.getRequestURI(), flushException);
+ }
+
+ try {
+ requestEx.getAsyncContext().complete();
+ } catch (Throwable completeException) {
+ LOGGER.error("Failed to complete async rest response, operation:{}, request uri:{}",
+ operationName, requestEx.getRequestURI(), completeException);
+ }
+ }
+}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
new file mode 100644
index 0000000..13f8204
--- /dev/null
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.transport.highway;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.ProducerInvocationFlow;
+import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HighwayProducerInvocationFlow extends ProducerInvocationFlow {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HighwayProducerInvocationFlow.class);
+
+ private final TcpConnection connection;
+
+ private final long msgId;
+
+ public HighwayProducerInvocationFlow(InvocationCreator invocationCreator, TcpConnection connection, long msgId) {
+ super(invocationCreator);
+ this.connection = connection;
+ this.msgId = msgId;
+ }
+
+ @Override
+ protected Invocation sendCreateInvocationException(Throwable throwable) {
+ LOGGER.error("Failed to prepare invocation, msgId={}", msgId, throwable);
+ return null;
+ }
+
+ @Override
+ protected void sendResponse(Invocation invocation, Response response) {
+ HighwayTransportContext transportContext = invocation.getTransportContext();
+ connection.write(transportContext.getResponseBuffer().getByteBuf());
+ }
+}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index 5e20ade..716efd3 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,7 +18,16 @@ package org.apache.servicecomb.transport.highway;
import javax.ws.rs.core.Response.Status;
+import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
+import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
import org.apache.servicecomb.foundation.vertx.server.TcpParser;
import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,7 +44,7 @@ import io.vertx.core.net.NetSocket;
public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
- private Endpoint endpoint;
+ private final Endpoint endpoint;
public HighwayServerConnection(Endpoint endpoint) {
this.endpoint = endpoint;
@@ -68,9 +77,8 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
}
protected RequestHeader decodeRequestHeader(long msgId, Buffer headerBuffer) {
- RequestHeader requestHeader = null;
try {
- requestHeader = HighwayCodec.readRequestHeader(headerBuffer);
+ return HighwayCodec.readRequestHeader(headerBuffer);
} catch (Exception e) {
String msg = String.format("decode request header error, msgId=%d",
msgId);
@@ -79,12 +87,10 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
netSocket.close();
return null;
}
-
- return requestHeader;
}
protected void onLogin(long msgId, RequestHeader header, Buffer bodyBuffer) {
- LoginRequest request = null;
+ LoginRequest request;
try {
request = LoginRequest.readObject(bodyBuffer);
} catch (Exception e) {
@@ -117,9 +123,37 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
}
protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
+ if (SCBEngine.getInstance().isFilterChainEnabled()) {
+ InvocationCreator creator = () -> createInvocation(msgId, header, bodyBuffer);
+ new HighwayProducerInvocationFlow(creator, this, msgId)
+ .run();
+ return;
+ }
+
HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint);
if (invoke.init(this, msgId, header, bodyBuffer)) {
invoke.execute();
}
}
+
+ public Invocation createInvocation(long msgId, RequestHeader header, Buffer bodyBuffer) {
+ MicroserviceMeta microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
+ SchemaMeta schemaMeta = microserviceMeta.ensureFindSchemaMeta(header.getSchemaId());
+ OperationMeta operationMeta = schemaMeta.ensureFindOperation(header.getOperationName());
+
+ Invocation invocation = InvocationFactory.forProvider(endpoint,
+ operationMeta,
+ null);
+ invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, netSocket.remoteAddress());
+
+ HighwayTransportContext transportContext = new HighwayTransportContext()
+ .setConnection(this)
+ .setMsgId(msgId)
+ .setHeader(header)
+ .setBodyBuffer(bodyBuffer)
+ .setOperationProtobuf(ProtobufManager.getOrCreateOperation(invocation));
+ invocation.setTransportContext(transportContext);
+
+ return invocation;
+ }
}
diff --git a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
index bf1f52a..74766fe 100644
--- a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
+++ b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
@@ -23,10 +23,13 @@ import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.servicecomb.common.rest.RestProducerInvocationFlow;
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
@@ -38,11 +41,14 @@ public class ServletRestDispatcher {
private Transport transport;
+ private MicroserviceMeta microserviceMeta;
+
private List<HttpServerFilter> httpServerFilters = SPIServiceUtils.getSortedService(HttpServerFilter.class);
public void service(HttpServletRequest request, HttpServletResponse response) {
if (transport == null) {
transport = SCBEngine.getInstance().getTransportManager().findTransport(Const.RESTFUL);
+ microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
}
// 异步场景
@@ -53,6 +59,15 @@ public class ServletRestDispatcher {
HttpServletRequestEx requestEx = new StandardHttpServletRequestEx(request);
HttpServletResponseEx responseEx = new StandardHttpServletResponseEx(response);
+ if (SCBEngine.getInstance().isFilterChainEnabled()) {
+ ((StandardHttpServletRequestEx) requestEx).setCacheRequest(true);
+ InvocationCreator creator = new RestServletProducerInvocationCreator(microserviceMeta, transport.getEndpoint(),
+ requestEx, responseEx);
+ new RestProducerInvocationFlow(creator, requestEx, responseEx)
+ .run();
+ return;
+ }
+
RestServletProducerInvocation restProducerInvocation = new RestServletProducerInvocation();
restProducerInvocation.invoke(transport, requestEx, responseEx, httpServerFilters);
}
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
index e152cff..9782994 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
@@ -24,10 +24,14 @@ import javax.ws.rs.core.Response.Status.Family;
import org.apache.servicecomb.common.rest.AbstractRestInvocation;
import org.apache.servicecomb.common.rest.RestConst;
+import org.apache.servicecomb.common.rest.RestProducerInvocationFlow;
+import org.apache.servicecomb.common.rest.RestVertxProducerInvocationCreator;
import org.apache.servicecomb.common.rest.VertxRestInvocation;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest;
@@ -54,6 +58,8 @@ public class VertxRestDispatcher extends AbstractVertxHttpDispatcher {
private Transport transport;
+ private MicroserviceMeta microserviceMeta;
+
@Override
public int getOrder() {
return DynamicPropertyFactory.getInstance().getIntProperty(KEY_ORDER, Integer.MAX_VALUE).get();
@@ -194,10 +200,20 @@ public class VertxRestDispatcher extends AbstractVertxHttpDispatcher {
protected void onRequest(RoutingContext context) {
if (transport == null) {
transport = SCBEngine.getInstance().getTransportManager().findTransport(Const.RESTFUL);
+ microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
}
HttpServletRequestEx requestEx = new VertxServerRequestToHttpServletRequest(context);
HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());
+ if (SCBEngine.getInstance().isFilterChainEnabled()) {
+ InvocationCreator creator = new RestVertxProducerInvocationCreator(context,
+ microserviceMeta, transport.getEndpoint(),
+ requestEx, responseEx);
+ new RestProducerInvocationFlow(creator, requestEx, responseEx)
+ .run();
+ return;
+ }
+
VertxRestInvocation vertxRestInvocation = new VertxRestInvocation();
context.put(RestConst.REST_PRODUCER_INVOCATION, vertxRestInvocation);
vertxRestInvocation.invoke(transport, requestEx, responseEx, httpServerFilters);