You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/06/05 07:20:37 UTC
[incubator-servicecomb-java-chassis] 01/03: Client Request Timeout
support for operation/schema/service level
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 906fb55ea57791273764b3c774ef5dc09037bf78
Author: maheshrajus <ma...@huawei.com>
AuthorDate: Thu May 31 12:56:27 2018 +0530
Client Request Timeout support for operation/schema/service level
---
.../core/transport/AbstractTransport.java | 104 ++++++++++++++++++---
.../core/transport/TestAbstractTransport.java | 79 +++++++++++++++-
.../servicecomb/demo/jaxrs/client/JaxrsClient.java | 44 +++++++++
.../src/main/resources/microservice.yaml | 7 ++
.../demo/jaxrs/server/RequestClientTimeOut.java | 49 ++++++++++
.../vertx/client/tcp/TcpClientConnection.java | 4 +
.../transport/highway/HighwayClient.java | 15 ++-
.../transport/highway/TestHighwayClient.java | 2 +-
.../rest/client/http/RestClientInvocation.java | 4 +-
9 files changed, 284 insertions(+), 24 deletions(-)
diff --git a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
index 0ea9056..a02f14e 100644
--- a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
+++ b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
@@ -23,13 +23,16 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.message.BasicNameValuePair;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException;
import org.apache.servicecomb.foundation.common.net.NetUtils;
import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
@@ -46,6 +49,16 @@ import io.vertx.core.Vertx;
public abstract class AbstractTransport implements Transport {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransport.class);
+ private static final String CONSUMER_REQUEST_TIMEOUT = "cse.request.timeout";
+
+ // key is configuration parameter.
+ private static Map<String, String> cfgCallback = new ConcurrentHashMapEx<>();
+
+ // key is config paramter
+ private static Map<String, AtomicLong> configCenterValue = new ConcurrentHashMapEx<>();
+
+ private static final long REQUEST_TIMEOUT_CFG_FAIL = -1;
+
/*
* 用于参数传递:比如向RestServerVerticle传递endpoint地址。
*/
@@ -53,18 +66,6 @@ public abstract class AbstractTransport implements Transport {
private static final long DEFAULT_TIMEOUT_MILLIS = 30000;
- private static DynamicLongProperty prop = null;
-
- public static DynamicLongProperty getRequestTimeoutProperty() {
- if (prop != null) {
- return prop;
- }
-
- prop = DynamicPropertyFactory.getInstance()
- .getLongProperty("cse.request.timeout", DEFAULT_TIMEOUT_MILLIS);
- return prop;
- }
-
// 所有transport使用同一个vertx实例,避免创建太多的线程
protected Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
@@ -160,4 +161,83 @@ public abstract class AbstractTransport implements Transport {
}
return new URIEndpointObject(address);
}
+
+ /**
+ * Handles the request timeout configurations.
+ *
+ * @param invocation
+ * invocation of request
+ * @return configuration value
+ */
+ public static long getReqTimeout(Invocation invocation) {
+ long value = 0;
+ String config;
+
+ // get the config base on priority. operationName-->schema-->service-->global
+ String operationName = invocation.getOperationName();
+ String schema = invocation.getSchemaId();
+ String serviceName = invocation.getMicroserviceName();
+
+ config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName + "." + schema + "." + operationName;
+ value = getConfigValue(config);
+ if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+ return value;
+ }
+
+ config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName + "." + schema;
+ value = getConfigValue(config);
+ if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+ return value;
+ }
+
+ config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName;
+ value = getConfigValue(config);
+ if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+ return value;
+ }
+
+ value = getConfigValue(CONSUMER_REQUEST_TIMEOUT);
+ if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+ return value;
+ }
+ return DEFAULT_TIMEOUT_MILLIS;
+ }
+
+ /**
+ * Get the configuration value
+ * @param config config parameter
+ * @return long value
+ */
+ private static long getConfigParam(String config, long defaultValue) {
+ DynamicLongProperty dynamicLongProperty = DynamicPropertyFactory.getInstance().getLongProperty(config,
+ defaultValue);
+
+ cfgCallback.computeIfAbsent(config, key -> {
+ dynamicLongProperty.addCallback(() -> {
+ long newValue = dynamicLongProperty.get();
+ String cfgName = dynamicLongProperty.getName();
+
+ //store the value in config center map and check for next requests.
+ configCenterValue.put(cfgName, new AtomicLong(newValue));
+ LOGGER.info("{} changed to {}", cfgName, newValue);
+ });
+ return config;
+ });
+
+ return dynamicLongProperty.get();
+ }
+
+ /**
+ * Get the configuration value
+ * @param config config parameter
+ * @return long value
+ */
+ private static long getConfigValue(String config) {
+ //first need to check in config center map which has high priority.
+ if (configCenterValue.containsKey(config)) {
+ return configCenterValue.get(config).get();
+ }
+
+ return getConfigParam(config, REQUEST_TIMEOUT_CFG_FAIL);
+ }
}
diff --git a/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java b/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
index 7ddb13e..baf752b 100644
--- a/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
+++ b/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
@@ -18,6 +18,7 @@
package org.apache.servicecomb.core.transport;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@@ -32,13 +33,26 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.util.ReflectionUtils;
+
+import com.netflix.config.DynamicProperty;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
public class TestAbstractTransport {
+ private Method updatePropertyMethod =
+ ReflectionUtils.findMethod(DynamicProperty.class, "updateProperty", String.class, Object.class);
+
+ private void updateProperty(String key, Object value) {
+ updatePropertyMethod.setAccessible(true);
+ ReflectionUtils.invokeMethod(updatePropertyMethod, null, key, value);
+ }
+
class MyAbstractTransport extends AbstractTransport {
+
@Override
public String getName() {
return "my";
@@ -125,7 +139,7 @@ public class TestAbstractTransport {
transport.setListenAddressWithoutSchema(null);
Assert.assertNull(transport.getEndpoint().getEndpoint());
Assert.assertNull(transport.parseAddress(null));
- Assert.assertEquals(30000, AbstractTransport.getRequestTimeoutProperty().get());
+ Assert.assertEquals(30000, AbstractTransport.getReqTimeout(Mockito.mock(Invocation.class)));
}
@Test(expected = NumberFormatException.class)
@@ -134,4 +148,67 @@ public class TestAbstractTransport {
transport.setListenAddressWithoutSchema(":127.0.0.1:9090");
}
+
+ /**
+ * Tests the request call timeout for service level timeout value
+ */
+ @Test
+ public void testRequestCfgService() throws Exception {
+ System.setProperty("cse.request.timeout.hello1", "3000");
+ Invocation invocation = Mockito.mock(Invocation.class);
+ Mockito.when(invocation.getOperationName()).thenReturn("sayHello1");
+ Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema1");
+ Mockito.when(invocation.getMicroserviceName()).thenReturn("hello1");
+ //check for service level timeout value
+ Assert.assertEquals(3000, AbstractTransport.getReqTimeout(invocation));
+ System.getProperties().remove("cse.request.timeout.hello1");
+ }
+
+ /**
+ * Tests the request call timeout for schema level timeout value
+ */
+ @Test
+ public void testRequestCfgSchema() throws Exception {
+ System.setProperty("cse.request.timeout.hello2.sayHelloSchema2", "2000");
+ Invocation invocation = Mockito.mock(Invocation.class);
+ Mockito.when(invocation.getOperationName()).thenReturn("sayHello2");
+ Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema2");
+ Mockito.when(invocation.getMicroserviceName()).thenReturn("hello2");
+
+ Assert.assertEquals(2000, AbstractTransport.getReqTimeout(invocation));
+ System.getProperties().remove("cse.request.timeout.hello2.sayHelloSchema2");
+ }
+
+ /**
+ * Tests the request call timeout for operatation level timeout value
+ */
+ @Test
+ public void testRequestCfgOperation() throws Exception {
+ System.setProperty("cse.request.timeout.hello3.sayHelloSchema3.sayHello3", "1000");
+ Invocation invocation = Mockito.mock(Invocation.class);
+ Mockito.when(invocation.getOperationName()).thenReturn("sayHello3");
+ Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema3");
+ Mockito.when(invocation.getMicroserviceName()).thenReturn("hello3");
+
+ Assert.assertEquals(1000, AbstractTransport.getReqTimeout(invocation));
+ System.getProperties().remove("cse.request.timeout.hello3.sayHelloSchema3.sayHello3");
+ }
+
+ /**
+ * Tests the request call timeout with configuration change event for operation level config.
+ */
+ @Test
+ public void testRequestTimeoutCfgEvent() {
+ System.setProperty("cse.request.timeout.hello4.sayHelloSchema4.sayHello4", "1000");
+ Invocation invocation = Mockito.mock(Invocation.class);
+ Mockito.when(invocation.getOperationName()).thenReturn("sayHello4");
+ Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema4");
+ Mockito.when(invocation.getMicroserviceName()).thenReturn("hello4");
+ Assert.assertEquals(1000, AbstractTransport.getReqTimeout(invocation));
+
+ updateProperty("cse.request.timeout.hello4.sayHelloSchema4.sayHello4", 2000);
+
+ Assert.assertEquals(2000, AbstractTransport.getReqTimeout(invocation));
+ System.getProperties().remove("cse.request.timeout.hello4.sayHelloSchema4.sayHello4");
+ }
}
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
index 5e63c2d..57eda68 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
@@ -24,6 +24,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import org.apache.servicecomb.common.rest.codec.RestObjectMapper;
+import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.CseContext;
import org.apache.servicecomb.demo.CodeFirstRestTemplate;
import org.apache.servicecomb.demo.DemoConst;
@@ -61,6 +62,7 @@ public class JaxrsClient {
codeFirstClient.testCodeFirst(templateNew, "jaxrs", "/codeFirstJaxrs/");
testCompute(templateNew);
testValidator(templateNew);
+ testClientTimeOut(templateNew);
}
private static void testCompute(RestTemplate template) throws Exception {
@@ -250,4 +252,46 @@ public class JaxrsClient {
Student result = template.postForObject(cseUrlPrefix + "sayhello", student, Student.class);
TestMgr.check("hello test 15", result);
}
+
+ private static void testClientTimeOut(RestTemplate template) throws Exception {
+ String microserviceName = "jaxrs";
+ for (String transport : DemoConst.transports) {
+ if (transport.equals(Const.ANY_TRANSPORT)) {
+ continue;
+ }
+ CseContext.getInstance().getConsumerProviderManager().setTransport(microserviceName, transport);
+ TestMgr.setMsg(microserviceName, transport);
+
+ String cseUrlPrefix = "cse://" + microserviceName + "/clientreqtimeout/";
+
+ testClientTimeoutSayHi(template, cseUrlPrefix);
+ testClientTimeoutAdd(template, cseUrlPrefix);
+ }
+ }
+
+ private static void testClientTimeoutSayHi(RestTemplate template, String cseUrlPrefix) {
+ Student student = new Student();
+ student.setName("timeout");
+ student.setAge(30);
+ Student result = template.postForObject(cseUrlPrefix + "sayhello", student, Student.class);
+ TestMgr.check("hello timeout 30", result);
+ }
+
+ private static void testClientTimeoutAdd(RestTemplate template, String cseUrlPrefix) {
+ Map<String, String> params = new HashMap<>();
+ params.put("a", "5");
+ params.put("b", "20");
+ boolean isExcep = false;
+ try {
+ template.postForObject(cseUrlPrefix + "add", params, Integer.class);
+ } catch (InvocationException e) {
+ isExcep = true;
+ TestMgr.check(490, e.getStatus().getStatusCode());
+ TestMgr.check(
+ "CommonExceptionData [message=Cse Internal Bad Request]",
+ e.getErrorData());
+ }
+
+ TestMgr.check(true, isExcep);
+ }
}
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
index 3defb01..439a27c 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
@@ -32,3 +32,10 @@ servicecomb:
name: mycustomrule
retryEnabled: true
retryHandler: mycustomhandler
+cse:
+ request:
+ timeout:
+ jaxrs:
+ clientreqtimeout:
+ sayHello: 500
+ add: 500
diff --git a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java
new file mode 100644
index 0000000..83ec4c4
--- /dev/null
+++ b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.demo.jaxrs.server;
+
+import javax.ws.rs.FormParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.servicecomb.demo.validator.Student;
+import org.apache.servicecomb.provider.rest.common.RestSchema;
+
+
+@RestSchema(schemaId = "clientreqtimeout")
+@Path("/clientreqtimeout")
+@Produces(MediaType.APPLICATION_JSON)
+public class RequestClientTimeOut {
+
+ @Path("/add")
+ @POST
+ public int add(@FormParam("a") int a, @FormParam("b") int b) throws InterruptedException {
+ Thread.sleep(1000);
+ return a + b;
+ }
+
+ @Path("/sayhello")
+ @POST
+ public Student sayHello(Student student) throws InterruptedException {
+ student.setName("hello " + student.getName());
+ student.setAge(student.getAge());
+ return student;
+ }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
index bef30e2..95611f7 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
@@ -87,6 +87,10 @@ public class TcpClientConnection extends TcpConnection {
return localSupportLogin;
}
+ public TcpClientConfig getClientConfig() {
+ return clientConfig;
+ }
+
public void setLocalSupportLogin(boolean localSupportLogin) {
this.localSupportLogin = localSupportLogin;
}
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
index 62f5e46..4ce6d2c 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
@@ -35,8 +35,6 @@ import org.apache.servicecomb.swagger.invocation.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.config.DynamicLongProperty;
-
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
@@ -63,13 +61,6 @@ public class HighwayClient {
private TcpClientConfig createTcpClientConfig() {
TcpClientConfig tcpClientConfig = new TcpClientConfig();
- DynamicLongProperty prop = AbstractTransport.getRequestTimeoutProperty();
- prop.addCallback(new Runnable() {
- public void run() {
- tcpClientConfig.setRequestTimeoutMillis(prop.get());
- }
- });
- tcpClientConfig.setRequestTimeoutMillis(prop.get());
SSLOptionFactory factory =
SSLOptionFactory.createSSLOptionFactory(SSL_KEY, null);
@@ -93,7 +84,13 @@ public class HighwayClient {
HighwayClientConnection tcpClient =
tcpClientPool.findOrCreateClient(invocation.getEndpoint().getEndpoint());
+
+ //set the timeout based on priority. the priority is follows.
+ //high priotiry: 1) operational level 2)schema level 3) service level 4) global level : low priotiry.
+ TcpClientConfig tcpClientConfig = tcpClient.getClientConfig();
+ tcpClientConfig.setRequestTimeoutMillis(AbstractTransport.getReqTimeout(invocation));
HighwayClientPackage clientPackage = new HighwayClientPackage(invocation, operationProtobuf, tcpClient);
+
LOGGER.debug("Sending request by highway, qualifiedName={}, endpoint={}.",
invocation.getMicroserviceQualifiedName(),
invocation.getEndpoint().getEndpoint());
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
index 6f9731b..16d06cc 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
@@ -83,7 +83,7 @@ public class TestHighwayClient {
@Test
public void testRequestTimeout() {
- Assert.assertEquals(AbstractTransport.getRequestTimeoutProperty().get(), 2000);
+ Assert.assertEquals(AbstractTransport.getReqTimeout(invocation), 2000);
}
@Test
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 180c46f..dcfdb08 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -116,7 +116,9 @@ public class RestClientInvocation {
// 从业务线程转移到网络线程中去发送
httpClientWithContext.runOnContext(httpClient -> {
this.setCseContext();
- clientRequest.setTimeout(AbstractTransport.getRequestTimeoutProperty().get());
+ //set the timeout based on priority. the priority is follows.
+ //high priotiry: 1) operational level 2)schema level 3) service level 4) global level : low priotiry.
+ clientRequest.setTimeout(AbstractTransport.getReqTimeout(invocation));
try {
restClientRequest.end();
} catch (Throwable e) {
--
To stop receiving notification emails like this one, please contact
liubao@apache.org.