You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2020/06/04 11:45:26 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1985] allow producer invocation process by filter chain

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bb31d5  [SCB-1985] allow producer invocation process by filter chain
9bb31d5 is described below

commit 9bb31d54c14060fc5e703cbc4ee7c08c73ed82ff
Author: wujimin <wu...@huawei.com>
AuthorDate: Thu Jun 4 15:51:38 2020 +0800

    [SCB-1985] allow producer invocation process by filter chain
---
 .../common/rest/RestProducerInvocationFlow.java    | 87 ++++++++++++++++++++++
 .../highway/HighwayProducerInvocationFlow.java     | 51 +++++++++++++
 .../transport/highway/HighwayServerConnection.java | 46 ++++++++++--
 .../rest/servlet/ServletRestDispatcher.java        | 15 ++++
 .../transport/rest/vertx/VertxRestDispatcher.java  | 16 ++++
 5 files changed, 209 insertions(+), 6 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
new file mode 100644
index 0000000..f56929e
--- /dev/null
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/RestProducerInvocationFlow.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.common.rest;
+
+import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter.isDownloadFileResponseType;
+
+import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
+import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
+import org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.exception.Exceptions;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.ProducerInvocationFlow;
+import org.apache.servicecomb.foundation.common.utils.PartUtils;
+import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
+import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestProducerInvocationFlow extends ProducerInvocationFlow {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RestProducerInvocationFlow.class);
+
+  private static final ProduceProcessor DEFAULT_PRODUCE_PROCESSOR = ProduceProcessorManager.INSTANCE
+      .findDefaultProcessor();
+
+  public RestProducerInvocationFlow(InvocationCreator invocationCreator,
+      HttpServletRequestEx requestEx, HttpServletResponseEx responseEx) {
+    super(invocationCreator, requestEx, responseEx);
+  }
+
+  @Override
+  protected Invocation sendCreateInvocationException(Throwable throwable) {
+    try {
+      Response response = Exceptions.exceptionToResponse(null, throwable, INTERNAL_SERVER_ERROR);
+      RestServerCodecFilter.encodeResponse(response, false, DEFAULT_PRODUCE_PROCESSOR, responseEx);
+    } catch (Throwable e) {
+      LOGGER.error("Failed to send response when prepare invocation failed, request uri:{}",
+          requestEx.getRequestURI(), e);
+    }
+
+    flushResponse("UNKNOWN_OPERATION");
+    return null;
+  }
+
+  @Override
+  protected void sendResponse(Invocation invocation, Response response) {
+    if (isDownloadFileResponseType(invocation, response)) {
+      responseEx.sendPart(PartUtils.getSinglePart(null, response.getResult()))
+          .whenComplete((r, e) -> flushResponse(invocation.getMicroserviceQualifiedName()));
+      return;
+    }
+
+    flushResponse(invocation.getMicroserviceQualifiedName());
+  }
+
+  private void flushResponse(String operationName) {
+    try {
+      responseEx.flushBuffer();
+    } catch (Throwable flushException) {
+      LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
+          operationName, requestEx.getRequestURI(), flushException);
+    }
+
+    try {
+      requestEx.getAsyncContext().complete();
+    } catch (Throwable completeException) {
+      LOGGER.error("Failed to complete async rest response, operation:{}, request uri:{}",
+          operationName, requestEx.getRequestURI(), completeException);
+    }
+  }
+}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
new file mode 100644
index 0000000..13f8204
--- /dev/null
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayProducerInvocationFlow.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.transport.highway;
+
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.ProducerInvocationFlow;
+import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HighwayProducerInvocationFlow extends ProducerInvocationFlow {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HighwayProducerInvocationFlow.class);
+
+  private final TcpConnection connection;
+
+  private final long msgId;
+
+  public HighwayProducerInvocationFlow(InvocationCreator invocationCreator, TcpConnection connection, long msgId) {
+    super(invocationCreator);
+    this.connection = connection;
+    this.msgId = msgId;
+  }
+
+  @Override
+  protected Invocation sendCreateInvocationException(Throwable throwable) {
+    LOGGER.error("Failed to prepare invocation, msgId={}", msgId, throwable);
+    return null;
+  }
+
+  @Override
+  protected void sendResponse(Invocation invocation, Response response) {
+    HighwayTransportContext transportContext = invocation.getTransportContext();
+    connection.write(transportContext.getResponseBuffer().getByteBuf());
+  }
+}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
index 5e20ade..716efd3 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java
@@ -18,7 +18,16 @@ package org.apache.servicecomb.transport.highway;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
+import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.SCBEngine;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.definition.OperationMeta;
+import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
+import org.apache.servicecomb.core.invocation.InvocationFactory;
 import org.apache.servicecomb.foundation.vertx.server.TcpBufferHandler;
 import org.apache.servicecomb.foundation.vertx.server.TcpParser;
 import org.apache.servicecomb.foundation.vertx.server.TcpServerConnection;
@@ -35,7 +44,7 @@ import io.vertx.core.net.NetSocket;
 public class HighwayServerConnection extends TcpServerConnection implements TcpBufferHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(HighwayServerConnection.class);
 
-  private Endpoint endpoint;
+  private final Endpoint endpoint;
 
   public HighwayServerConnection(Endpoint endpoint) {
     this.endpoint = endpoint;
@@ -68,9 +77,8 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
   }
 
   protected RequestHeader decodeRequestHeader(long msgId, Buffer headerBuffer) {
-    RequestHeader requestHeader = null;
     try {
-      requestHeader = HighwayCodec.readRequestHeader(headerBuffer);
+      return HighwayCodec.readRequestHeader(headerBuffer);
     } catch (Exception e) {
       String msg = String.format("decode request header error, msgId=%d",
           msgId);
@@ -79,12 +87,10 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
       netSocket.close();
       return null;
     }
-
-    return requestHeader;
   }
 
   protected void onLogin(long msgId, RequestHeader header, Buffer bodyBuffer) {
-    LoginRequest request = null;
+    LoginRequest request;
     try {
       request = LoginRequest.readObject(bodyBuffer);
     } catch (Exception e) {
@@ -117,9 +123,37 @@ public class HighwayServerConnection extends TcpServerConnection implements TcpB
   }
 
   protected void onRequest(long msgId, RequestHeader header, Buffer bodyBuffer) {
+    if (SCBEngine.getInstance().isFilterChainEnabled()) {
+      InvocationCreator creator = () -> createInvocation(msgId, header, bodyBuffer);
+      new HighwayProducerInvocationFlow(creator, this, msgId)
+          .run();
+      return;
+    }
+
     HighwayServerInvoke invoke = new HighwayServerInvoke(endpoint);
     if (invoke.init(this, msgId, header, bodyBuffer)) {
       invoke.execute();
     }
   }
+
+  public Invocation createInvocation(long msgId, RequestHeader header, Buffer bodyBuffer) {
+    MicroserviceMeta microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
+    SchemaMeta schemaMeta = microserviceMeta.ensureFindSchemaMeta(header.getSchemaId());
+    OperationMeta operationMeta = schemaMeta.ensureFindOperation(header.getOperationName());
+
+    Invocation invocation = InvocationFactory.forProvider(endpoint,
+        operationMeta,
+        null);
+    invocation.getHandlerContext().put(Const.REMOTE_ADDRESS, netSocket.remoteAddress());
+
+    HighwayTransportContext transportContext = new HighwayTransportContext()
+        .setConnection(this)
+        .setMsgId(msgId)
+        .setHeader(header)
+        .setBodyBuffer(bodyBuffer)
+        .setOperationProtobuf(ProtobufManager.getOrCreateOperation(invocation));
+    invocation.setTransportContext(transportContext);
+
+    return invocation;
+  }
 }
diff --git a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
index bf1f52a..74766fe 100644
--- a/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
+++ b/transports/transport-rest/transport-rest-servlet/src/main/java/org/apache/servicecomb/transport/rest/servlet/ServletRestDispatcher.java
@@ -23,10 +23,13 @@ import javax.servlet.AsyncContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.servicecomb.common.rest.RestProducerInvocationFlow;
 import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.SCBEngine;
 import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
 import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
@@ -38,11 +41,14 @@ public class ServletRestDispatcher {
 
   private Transport transport;
 
+  private MicroserviceMeta microserviceMeta;
+
   private List<HttpServerFilter> httpServerFilters = SPIServiceUtils.getSortedService(HttpServerFilter.class);
 
   public void service(HttpServletRequest request, HttpServletResponse response) {
     if (transport == null) {
       transport = SCBEngine.getInstance().getTransportManager().findTransport(Const.RESTFUL);
+      microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
     }
 
     // 异步场景
@@ -53,6 +59,15 @@ public class ServletRestDispatcher {
     HttpServletRequestEx requestEx = new StandardHttpServletRequestEx(request);
     HttpServletResponseEx responseEx = new StandardHttpServletResponseEx(response);
 
+    if (SCBEngine.getInstance().isFilterChainEnabled()) {
+      ((StandardHttpServletRequestEx) requestEx).setCacheRequest(true);
+      InvocationCreator creator = new RestServletProducerInvocationCreator(microserviceMeta, transport.getEndpoint(),
+          requestEx, responseEx);
+      new RestProducerInvocationFlow(creator, requestEx, responseEx)
+          .run();
+      return;
+    }
+
     RestServletProducerInvocation restProducerInvocation = new RestServletProducerInvocation();
     restProducerInvocation.invoke(transport, requestEx, responseEx, httpServerFilters);
   }
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
index e152cff..9782994 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestDispatcher.java
@@ -24,10 +24,14 @@ import javax.ws.rs.core.Response.Status.Family;
 
 import org.apache.servicecomb.common.rest.AbstractRestInvocation;
 import org.apache.servicecomb.common.rest.RestConst;
+import org.apache.servicecomb.common.rest.RestProducerInvocationFlow;
+import org.apache.servicecomb.common.rest.RestVertxProducerInvocationCreator;
 import org.apache.servicecomb.common.rest.VertxRestInvocation;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.SCBEngine;
 import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.core.definition.MicroserviceMeta;
+import org.apache.servicecomb.core.invocation.InvocationCreator;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
 import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
 import org.apache.servicecomb.foundation.vertx.http.VertxServerRequestToHttpServletRequest;
@@ -54,6 +58,8 @@ public class VertxRestDispatcher extends AbstractVertxHttpDispatcher {
 
   private Transport transport;
 
+  private MicroserviceMeta microserviceMeta;
+
   @Override
   public int getOrder() {
     return DynamicPropertyFactory.getInstance().getIntProperty(KEY_ORDER, Integer.MAX_VALUE).get();
@@ -194,10 +200,20 @@ public class VertxRestDispatcher extends AbstractVertxHttpDispatcher {
   protected void onRequest(RoutingContext context) {
     if (transport == null) {
       transport = SCBEngine.getInstance().getTransportManager().findTransport(Const.RESTFUL);
+      microserviceMeta = SCBEngine.getInstance().getProducerMicroserviceMeta();
     }
     HttpServletRequestEx requestEx = new VertxServerRequestToHttpServletRequest(context);
     HttpServletResponseEx responseEx = new VertxServerResponseToHttpServletResponse(context.response());
 
+    if (SCBEngine.getInstance().isFilterChainEnabled()) {
+      InvocationCreator creator = new RestVertxProducerInvocationCreator(context,
+          microserviceMeta, transport.getEndpoint(),
+          requestEx, responseEx);
+      new RestProducerInvocationFlow(creator, requestEx, responseEx)
+          .run();
+      return;
+    }
+
     VertxRestInvocation vertxRestInvocation = new VertxRestInvocation();
     context.put(RestConst.REST_PRODUCER_INVOCATION, vertxRestInvocation);
     vertxRestInvocation.invoke(transport, requestEx, responseEx, httpServerFilters);