You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/04/22 08:32:35 UTC
[incubator-servicecomb-java-chassis] 08/09: [SCB-486] edge server
related client connection use the same context,
so that file download can use pump logic.
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 78ab2c4f93b3ea0ed16fe6e4a1d54fc3f9191856
Author: wujimin <wu...@huawei.com>
AuthorDate: Sun Apr 22 04:22:15 2018 +0800
[SCB-486] edge server related client connection use the same context, so that file download can use pump logic.
---
.../servicecomb/edge/core/EdgeBootListener.java | 5 ++++
.../servicecomb/edge/core/EdgeInvocation.java | 4 +++
.../edge/core/EdgeRestServerVerticle.java | 31 ++++++---------------
.../edge/core/EdgeRestTransportClient.java | 32 +++++++---------------
.../servicecomb/edge/core/TestEdgeInvocation.java | 1 +
.../transport/rest/client/RestTransportClient.java | 14 ++++++++--
.../rest/client/RestTransportClientManager.java | 3 +-
.../rest/client/TransportClientConfig.java | 16 +++++++++--
.../transport/rest/vertx/TransportConfig.java | 12 ++++++++
.../transport/rest/vertx/VertxRestTransport.java | 3 +-
10 files changed, 70 insertions(+), 51 deletions(-)
diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
index 1762980..55ee392 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeBootListener.java
@@ -20,6 +20,8 @@ package org.apache.servicecomb.edge.core;
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.core.BootListener;
import org.apache.servicecomb.core.executor.ExecutorManager;
+import org.apache.servicecomb.transport.rest.client.TransportClientConfig;
+import org.apache.servicecomb.transport.rest.vertx.TransportConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -36,6 +38,9 @@ public class EdgeBootListener implements BootListener {
return;
}
+ TransportClientConfig.setRestTransportClientCls(EdgeRestTransportClient.class);
+ TransportConfig.setRestServerVerticle(EdgeRestServerVerticle.class);
+
String defaultExecutor = DynamicPropertyFactory.getInstance()
.getStringProperty(ExecutorManager.KEY_EXECUTORS_DEFAULT, null)
.get();
diff --git a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
index 71a9f65..4991443 100644
--- a/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeInvocation.java
@@ -36,9 +36,12 @@ import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.consumer.MicroserviceVersionRule;
import org.apache.servicecomb.serviceregistry.definition.DefinitionConst;
+import io.vertx.core.Vertx;
import io.vertx.ext.web.RoutingContext;
public class EdgeInvocation extends AbstractRestInvocation {
+ public static final String EDGE_INVOCATION_CONTEXT = "edgeInvocationContext";
+
protected String microserviceName;
protected MicroserviceVersionRule microserviceVersionRule;
@@ -122,6 +125,7 @@ public class EdgeInvocation extends AbstractRestInvocation {
restOperationMeta.getOperationMeta(),
null);
this.invocation.setSync(false);
+ this.invocation.getHandlerContext().put(EDGE_INVOCATION_CONTEXT, Vertx.currentContext());
this.invocation.setResponseExecutor(new ReactiveResponseExecutor());
}
}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
similarity index 50%
copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
index f969ba5..a7d7529 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestServerVerticle.java
@@ -14,30 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.servicecomb.edge.core;
-package org.apache.servicecomb.transport.rest.client;
+import org.apache.servicecomb.transport.rest.client.RestTransportClient;
+import org.apache.servicecomb.transport.rest.vertx.RestServerVerticle;
-import org.apache.servicecomb.foundation.vertx.VertxUtils;
+public class EdgeRestServerVerticle extends RestServerVerticle {
+ @Override
+ public void start() throws Exception {
+ super.start();
-import io.vertx.core.Vertx;
-
-public final class RestTransportClientManager {
- public static final RestTransportClientManager INSTANCE = new RestTransportClientManager();
-
- // same instance in AbstractTranport. need refactor in future.
- private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
-
- private RestTransportClient restClient = new RestTransportClient();
-
- private RestTransportClientManager() {
- try {
- restClient.init(transportVertx);
- } catch (Exception e) {
- throw new IllegalStateException("Failed to init RestTransportClient.", e);
- }
- }
-
- public RestTransportClient getRestClient() {
- return restClient;
+ RestTransportClient restClient = (RestTransportClient) config().getValue(RestTransportClient.class.getName());
+ restClient.getClientMgr().findClientPool(false, context);
}
}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
similarity index 50%
copy from transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
copy to edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
index f969ba5..bfac386 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/edge/edge-core/src/main/java/org/apache/servicecomb/edge/core/EdgeRestTransportClient.java
@@ -14,30 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.servicecomb.edge.core;
-package org.apache.servicecomb.transport.rest.client;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext;
+import org.apache.servicecomb.transport.rest.client.RestTransportClient;
-import org.apache.servicecomb.foundation.vertx.VertxUtils;
+import io.vertx.core.Context;
-import io.vertx.core.Vertx;
-
-public final class RestTransportClientManager {
- public static final RestTransportClientManager INSTANCE = new RestTransportClientManager();
-
- // same instance in AbstractTranport. need refactor in future.
- private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
-
- private RestTransportClient restClient = new RestTransportClient();
-
- private RestTransportClientManager() {
- try {
- restClient.init(transportVertx);
- } catch (Exception e) {
- throw new IllegalStateException("Failed to init RestTransportClient.", e);
- }
- }
-
- public RestTransportClient getRestClient() {
- return restClient;
+public class EdgeRestTransportClient extends RestTransportClient {
+ @Override
+ protected HttpClientWithContext findHttpClientPool(Invocation invocation) {
+ Context invocationContext = (Context) invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT);
+ return clientMgr.findClientPool(invocation.isSync(), invocationContext);
}
}
diff --git a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
index 302656f..2f821a8 100644
--- a/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
+++ b/edge/edge-core/src/test/java/org/apache/servicecomb/edge/core/TestEdgeInvocation.java
@@ -228,5 +228,6 @@ public class TestEdgeInvocation {
Invocation invocation = Deencapsulation.getField(edgeInvocation, "invocation");
Assert.assertThat(invocation.getResponseExecutor(), Matchers.instanceOf(ReactiveResponseExecutor.class));
Assert.assertFalse(invocation.isSync());
+ Assert.assertSame(context, invocation.getHandlerContext().get(EdgeInvocation.EDGE_INVOCATION_CONTEXT));
}
}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
index 937097a..8c43814 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClient.java
@@ -37,15 +37,19 @@ import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClientOptions;
-public final class RestTransportClient {
+public class RestTransportClient {
private static final Logger LOGGER = LoggerFactory.getLogger(RestTransportClient.class);
private static final String SSL_KEY = "rest.consumer";
- private ClientPoolManager<HttpClientWithContext> clientMgr;
+ protected ClientPoolManager<HttpClientWithContext> clientMgr;
private List<HttpClientFilter> httpClientFilters;
+ public ClientPoolManager<HttpClientWithContext> getClientMgr() {
+ return clientMgr;
+ }
+
public void init(Vertx vertx) throws Exception {
httpClientFilters = SPIServiceUtils.getSortedService(HttpClientFilter.class);
@@ -69,7 +73,7 @@ public final class RestTransportClient {
}
public void send(Invocation invocation, AsyncResponse asyncResp) {
- HttpClientWithContext httpClientWithContext = clientMgr.findClientPool(invocation.isSync());
+ HttpClientWithContext httpClientWithContext = findHttpClientPool(invocation);
RestClientInvocation restClientInvocation = new RestClientInvocation(httpClientWithContext, httpClientFilters);
try {
restClientInvocation.invoke(invocation, asyncResp);
@@ -78,4 +82,8 @@ public final class RestTransportClient {
LOGGER.error("vertx rest transport send error.", e);
}
}
+
+ protected HttpClientWithContext findHttpClientPool(Invocation invocation) {
+ return clientMgr.findClientPool(invocation.isSync());
+ }
}
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
index f969ba5..3dfd785 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestTransportClientManager.java
@@ -27,10 +27,11 @@ public final class RestTransportClientManager {
// same instance in AbstractTranport. need refactor in future.
private final Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
- private RestTransportClient restClient = new RestTransportClient();
+ private RestTransportClient restClient;
private RestTransportClientManager() {
try {
+ restClient = TransportClientConfig.getRestTransportClientCls().newInstance();
restClient.init(transportVertx);
} catch (Exception e) {
throw new IllegalStateException("Failed to init RestTransportClient.", e);
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
index 9803267..15d1226 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/TransportClientConfig.java
@@ -20,9 +20,19 @@ package org.apache.servicecomb.transport.rest.client;
import com.netflix.config.DynamicPropertyFactory;
public final class TransportClientConfig {
+ private static Class<? extends RestTransportClient> restTransportClientCls = RestTransportClient.class;
+
private TransportClientConfig() {
}
+ public static Class<? extends RestTransportClient> getRestTransportClientCls() {
+ return restTransportClientCls;
+ }
+
+ public static void setRestTransportClientCls(Class<? extends RestTransportClient> restTransportClientCls) {
+ TransportClientConfig.restTransportClientCls = restTransportClientCls;
+ }
+
public static int getThreadCount() {
return DynamicPropertyFactory.getInstance().getIntProperty("cse.rest.client.thread-count", 1).get();
}
@@ -40,8 +50,10 @@ public final class TransportClientConfig {
public static boolean getConnectionKeepAlive() {
return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.keepAlive", true).get();
}
-
+
public static boolean getConnectionCompression() {
- return DynamicPropertyFactory.getInstance().getBooleanProperty("cse.rest.client.connection.compression", false).get();
+ return DynamicPropertyFactory.getInstance()
+ .getBooleanProperty("cse.rest.client.connection.compression", false)
+ .get();
}
}
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
index fee033c..01f13c8 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/TransportConfig.java
@@ -21,6 +21,8 @@ import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
+import io.vertx.core.Verticle;
+
public final class TransportConfig {
public static final int DEFAULT_SERVER_THREAD_COUNT = 1;
@@ -32,9 +34,19 @@ public final class TransportConfig {
// 32K
public static final int DEFAULT_SERVER_MAX_HEADER_SIZE = 32 * 1024;
+ private static Class<? extends Verticle> restServerVerticle = RestServerVerticle.class;
+
private TransportConfig() {
}
+ public static Class<? extends Verticle> getRestServerVerticle() {
+ return restServerVerticle;
+ }
+
+ public static void setRestServerVerticle(Class<? extends Verticle> restServerVerticle) {
+ TransportConfig.restServerVerticle = restServerVerticle;
+ }
+
public static String getAddress() {
DynamicStringProperty address =
DynamicPropertyFactory.getInstance().getStringProperty("cse.rest.address", null);
diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
index 66f4638..008792c 100644
--- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
+++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/VertxRestTransport.java
@@ -74,8 +74,9 @@ public class VertxRestTransport extends AbstractTransport {
DeploymentOptions options = new DeploymentOptions().setInstances(TransportConfig.getThreadCount());
SimpleJsonObject json = new SimpleJsonObject();
json.put(ENDPOINT_KEY, getEndpoint());
+ json.put(RestTransportClient.class.getName(), restClient);
options.setConfig(json);
- return VertxUtils.blockDeploy(transportVertx, RestServerVerticle.class, options);
+ return VertxUtils.blockDeploy(transportVertx, TransportConfig.getRestServerVerticle(), options);
}
@Override
--
To stop receiving notification emails like this one, please contact
liubao@apache.org.