You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/22 08:32:35 UTC

[incubator-servicecomb-java-chassis] 08/09: [SCB-486] edge server related client connection use the same context, so that file download can use pump logic.

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 78ab2c4f93b3ea0ed16fe6e4a1d54fc3f9191856
Author: wujimin <wu...@huawei.com>
AuthorDate: Sun Apr 22 04:22:15 2018 +0800

    [SCB-486] edge server related client connection use the same context, so that file download can use pump logic.
---
 .../servicecomb/edge/core/EdgeBootListener.java    |  5 ++++
 .../servicecomb/edge/core/EdgeInvocation.java      |  4 +++
 .../edge/core/EdgeRestServerVerticle.java          | 31 ++++++---------------
 .../edge/core/EdgeRestTransportClient.java         | 32 +++++++---------------
 .../servicecomb/edge/core/TestEdgeInvocation.java  |  1 +
 .../transport/rest/client/RestTransportClient.java | 14 ++++++++--
 .../rest/client/RestTransportClientManager.java    |  3 +-
 .../rest/client/TransportClientConfig.java         | 16 +++++++++--
 .../transport/rest/vertx/TransportConfig.java      | 12 ++++++++
 .../transport/rest/vertx/VertxRestTransport.java   |  3 +-
 10 files changed, 70 insertions(+), 51 deletions(-)

diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
index 1762980..55ee392 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
@@ -20,6 +20,8 @@ package org.apache.servicecomb.edge.core;
 import org.apache.commons.configuration.Configuration;
 import org.apache.servicecomb.core.BootListener;
 import org.apache.servicecomb.core.executor.ExecutorManager;
+import org.apache.servicecomb.transport.rest.client.TransportClientConfig;
+import org.apache.servicecomb.transport.rest.vertx.TransportConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -36,6 +38,9 @@ public class EdgeBootListener implements BootListener {
       return;
     }
 
+    TransportClientConfig.setRestTransportClientCls(EdgeRestTransportClient.class);
+    TransportConfig.setRestServerVerticle(EdgeRestServerVerticle.class);
+
     String defaultExecutor = DynamicPropertyFactory.getInstance()
         .getStringProperty(ExecutorManager.KEY_EXECUTORS_DEFAULT, null)
         .get();
diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
index 71a9f65..4991443 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
@@ -36,9 +36,12 @@ import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.consumer.MicroserviceVersionRule;
 import org.apache.servicecomb.serviceregistry.definition.DefinitionConst;
 
+import io.vertx.core.Vertx;
 import io.vertx.ext.web.RoutingContext;
 
 public class EdgeInvocation extends AbstractRestInvocation {
+  public static final String EDGE_INVOCATION_CONTEXT = "edgeInvocationContext";
+
   protected String microserviceName;
 
   protected MicroserviceVersionRule microserviceVersionRule;
@@ -122,6 +125,7 @@ public class EdgeInvocation extends AbstractRestInvocation {
         restOperationMeta.getOperationMeta(),
         null);
     this.invocation.setSync(false);
+    this.invocation.getHandlerContext().put(EDGE_INVOCATION_CONTEXT, Vertx.currentContext());
     this.invocation.setResponseExecutor(new ReactiveResponseExecutor());
   }
 }
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
similarity index 50%
copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
index f969ba5..a7d7529 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
@@ -14,30 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.servicecomb.edge.core;
 
-package org.apache.servicecomb.transport.rest.client;
+import org.apache.servicecomb.transport.rest.client.RestTransportClient;
+import org.apache.servicecomb.transport.rest.vertx.RestServerVerticle;
 
-import org.apache.servicecomb.foundation.vertx.VertxUtils;
+public class EdgeRestServerVerticle extends RestServerVerticle {
+  @Override
+  public void start() throws Exception {
+    super.start();
 
-import io.vertx.core.Vertx;
-
-public final class RestTransportClientManager {
-  public static final RestTransportClientManager INSTANCE = new RestTransportClientManager();
-
-  // same instance in AbstractTranport. need refactor in future.
-  private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
-
-  private RestTransportClient restClient = new RestTransportClient();
-
-  private RestTransportClientManager() {
-    try {
-      restClient.init(transportVertx);
-    } catch (Exception e) {
-      throw new IllegalStateException("Failed to init RestTransportClient.", e);
-    }
-  }
-
-  public RestTransportClient getRestClient() {
-    return restClient;
+    RestTransportClient restClient = (RestTransportClient) config().getValue(RestTransportClient.class.getName());
+    restClient.getClientMgr().findClientPool(false, context);
   }
 }
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
similarity index 50%
copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
index f969ba5..bfac386 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
@@ -14,30 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.servicecomb.edge.core;
 
-package org.apache.servicecomb.transport.rest.client;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext;
+import org.apache.servicecomb.transport.rest.client.RestTransportClient;
 
-import org.apache.servicecomb.foundation.vertx.VertxUtils;
+import io.vertx.core.Context;
 
-import io.vertx.core.Vertx;
-
-public final class RestTransportClientManager {
-  public static final RestTransportClientManager INSTANCE = new RestTransportClientManager();
-
-  // same instance in AbstractTranport. need refactor in future.
-  private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
-
-  private RestTransportClient restClient = new RestTransportClient();
-
-  private RestTransportClientManager() {
-    try {
-      restClient.init(transportVertx);
-    } catch (Exception e) {
-      throw new IllegalStateException("Failed to init RestTransportClient.", e);
-    }
-  }
-
-  public RestTransportClient getRestClient() {
-    return restClient;
+public class EdgeRestTransportClient extends RestTransportClient {
+  @Override
+  protected HttpClientWithContext findHttpClientPool(Invocation invocation) {
+    Context invocationContext = (Context) invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT);
+    return clientMgr.findClientPool(invocation.isSync(), invocationContext);
   }
 }
diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
index 302656f..2f821a8 100644
--- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
+++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
@@ -228,5 +228,6 @@ public class TestEdgeInvocation {
     Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation");
     Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class));
     Assert.assertFalse(invocation.isSync());
+    Assert.assertSame(context, invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT));
   }
 }
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
index 937097a..8c43814 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
@@ -37,15 +37,19 @@ import io.vertx.core.DeploymentOptions;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpClientOptions;
 
-public final class RestTransportClient {
+public class RestTransportClient {
   private static final Logger LOGGER = LoggerFactory.getLogger(RestTransportClient.class);
 
   private static final String SSL_KEY = "rest.consumer";
 
-  private ClientPoolManager<HttpClientWithContext> clientMgr;
+  protected ClientPoolManager<HttpClientWithContext> clientMgr;
 
   private List<HttpClientFilter> httpClientFilters;
 
+  public ClientPoolManager<HttpClientWithContext> getClientMgr() {
+    return clientMgr;
+  }
+
   public void init(Vertx vertx) throws Exception {
     httpClientFilters = SPIServiceUtils.getSortedService(HttpClientFilter.class);
 
@@ -69,7 +73,7 @@ public final class RestTransportClient {
   }
 
   public void send(Invocation invocation, AsyncResponse asyncResp) {
-    HttpClientWithContext httpClientWithContext = clientMgr.findClientPool(invocation.isSync());
+    HttpClientWithContext httpClientWithContext = findHttpClientPool(invocation);
     RestClientInvocation restClientInvocation = new RestClientInvocation(httpClientWithContext, httpClientFilters);
     try {
       restClientInvocation.invoke(invocation, asyncResp);
@@ -78,4 +82,8 @@ public final class RestTransportClient {
       LOGGER.error("vertx rest transport send error.", e);
     }
   }
+
+  protected HttpClientWithContext findHttpClientPool(Invocation invocation) {
+    return clientMgr.findClientPool(invocation.isSync());
+  }
 }
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
index f969ba5..3dfd785 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
@@ -27,10 +27,11 @@ public final class RestTransportClientManager {
   // same instance in AbstractTranport. need refactor in future.
   private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
 
-  private RestTransportClient restClient = new RestTransportClient();
+  private RestTransportClient restClient;
 
   private RestTransportClientManager() {
     try {
+      restClient = TransportClientConfig.getRestTransportClientCls().newInstance();
       restClient.init(transportVertx);
     } catch (Exception e) {
       throw new IllegalStateException("Failed to init RestTransportClient.", e);
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
index 9803267..15d1226 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
@@ -20,9 +20,19 @@ package org.apache.servicecomb.transport.rest.client;
 import com.netflix.config.DynamicPropertyFactory;
 
 public final class TransportClientConfig {
+  private static Class<? extends RestTransportClient> restTransportClientCls = RestTransportClient.class;
+
   private TransportClientConfig() {
   }
 
+  public static Class<? extends RestTransportClient> getRestTransportClientCls() {
+    return restTransportClientCls;
+  }
+
+  public static void setRestTransportClientCls(Class<? extends RestTransportClient> restTransportClientCls) {
+    TransportClientConfig.restTransportClientCls = restTransportClientCls;
+  }
+
   public static int getThreadCount() {
     return DynamicPropertyFactory.getInstance().getIntProperty("cse.rest.client.thread-count", 1).get();
   }
@@ -40,8 +50,10 @@ public final class TransportClientConfig {
   public static boolean getConnectionKeepAlive() {
     return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.keepAlive", true).get();
   }
-  
+
   public static boolean getConnectionCompression() {
-    return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.compression", false).get();
+    return DynamicPropertyFactory.getInstance()
+        .getBooleanProperty("cse.rest.client.connection.compression", false)
+        .get();
   }
 }
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
index fee033c..01f13c8 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
@@ -21,6 +21,8 @@ import com.netflix.config.DynamicIntProperty;
 import com.netflix.config.DynamicPropertyFactory;
 import com.netflix.config.DynamicStringProperty;
 
+import io.vertx.core.Verticle;
+
 public final class TransportConfig {
 
   public static final int DEFAULT_SERVER_THREAD_COUNT = 1;
@@ -32,9 +34,19 @@ public final class TransportConfig {
   // 32K
   public static final int DEFAULT_SERVER_MAX_HEADER_SIZE = 32 * 1024;
 
+  private static Class<? extends Verticle> restServerVerticle = RestServerVerticle.class;
+
   private TransportConfig() {
   }
 
+  public static Class<? extends Verticle> getRestServerVerticle() {
+    return restServerVerticle;
+  }
+
+  public static void setRestServerVerticle(Class<? extends Verticle> restServerVerticle) {
+    TransportConfig.restServerVerticle = restServerVerticle;
+  }
+
   public static String getAddress() {
     DynamicStringProperty address =
         DynamicPropertyFactory.getInstance().getStringProperty("cse.rest.address", null);
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
index 66f4638..008792c 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
@@ -74,8 +74,9 @@ public class VertxRestTransport extends AbstractTransport {
     DeploymentOptions options = new DeploymentOptions().setInstances(TransportConfig.getThreadCount());
     SimpleJsonObject json = new SimpleJsonObject();
     json.put(ENDPOINT_KEY, getEndpoint());
+    json.put(RestTransportClient.class.getName(), restClient);
     options.setConfig(json);
-    return VertxUtils.blockDeploy(transportVertx, RestServerVerticle.class, options);
+    return VertxUtils.blockDeploy(transportVertx, TransportConfig.getRestServerVerticle(), options);
   }
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.