You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/06/05 07:20:37 UTC

[incubator-servicecomb-java-chassis] 01/03: Client Request Timeout support for operation/schema/service level

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 906fb55ea57791273764b3c774ef5dc09037bf78
Author: maheshrajus <ma...@huawei.com>
AuthorDate: Thu May 31 12:56:27 2018 +0530

    Client Request Timeout support for operation/schema/service level
---
 .../core/transport/AbstractTransport.java          | 104 ++++++++++++++++++---
 .../core/transport/TestAbstractTransport.java      |  79 +++++++++++++++-
 .../servicecomb/demo/jaxrs/client/JaxrsClient.java |  44 +++++++++
 .../src/main/resources/microservice.yaml           |   7 ++
 .../demo/jaxrs/server/RequestClientTimeOut.java    |  49 ++++++++++
 .../vertx/client/tcp/TcpClientConnection.java      |   4 +
 .../transport/highway/HighwayClient.java           |  15 ++-
 .../transport/highway/TestHighwayClient.java       |   2 +-
 .../rest/client/http/RestClientInvocation.java     |   4 +-
 9 files changed, 284 insertions(+), 24 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
index 0ea9056..a02f14e 100644
--- a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
+++ b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java
@@ -23,13 +23,16 @@ import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import org.apache.http.client.utils.URLEncodedUtils;
 import org.apache.http.message.BasicNameValuePair;
 import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.Endpoint;
+import org.apache.servicecomb.core.Invocation;
 import org.apache.servicecomb.core.Transport;
+import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
 import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException;
 import org.apache.servicecomb.foundation.common.net.NetUtils;
 import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
@@ -46,6 +49,16 @@ import io.vertx.core.Vertx;
 public abstract class AbstractTransport implements Transport {
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransport.class);
 
+  private static final String CONSUMER_REQUEST_TIMEOUT = "cse.request.timeout";
+
+  // key is configuration parameter.
+  private static Map<String, String> cfgCallback = new ConcurrentHashMapEx<>();
+
+  // key is config paramter
+  private static Map<String, AtomicLong> configCenterValue = new ConcurrentHashMapEx<>();
+
+  private static final long REQUEST_TIMEOUT_CFG_FAIL = -1;
+
   /*
    * 用于参数传递:比如向RestServerVerticle传递endpoint地址。
    */
@@ -53,18 +66,6 @@ public abstract class AbstractTransport implements Transport {
 
   private static final long DEFAULT_TIMEOUT_MILLIS = 30000;
 
-  private static DynamicLongProperty prop = null;
-
-  public static DynamicLongProperty getRequestTimeoutProperty() {
-    if (prop != null) {
-      return prop;
-    }
-
-    prop = DynamicPropertyFactory.getInstance()
-        .getLongProperty("cse.request.timeout", DEFAULT_TIMEOUT_MILLIS);
-    return prop;
-  }
-
   // 所有transport使用同一个vertx实例,避免创建太多的线程
   protected Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null);
 
@@ -160,4 +161,83 @@ public abstract class AbstractTransport implements Transport {
     }
     return new URIEndpointObject(address);
   }
+
+  /**
+   * Handles the request timeout configurations.
+   * 
+   * @param invocation
+   *            invocation of request
+   * @return configuration value
+   */
+  public static long getReqTimeout(Invocation invocation) {
+    long value = 0;
+    String config;
+
+    // get the config base on priority. operationName-->schema-->service-->global
+    String operationName = invocation.getOperationName();
+    String schema = invocation.getSchemaId();
+    String serviceName = invocation.getMicroserviceName();
+
+    config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName + "." + schema + "." + operationName;
+    value = getConfigValue(config);
+    if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+      return value;
+    }
+
+    config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName + "." + schema;
+    value = getConfigValue(config);
+    if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+      return value;
+    }
+
+    config = CONSUMER_REQUEST_TIMEOUT + "." + serviceName;
+    value = getConfigValue(config);
+    if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+      return value;
+    }
+
+    value = getConfigValue(CONSUMER_REQUEST_TIMEOUT);
+    if ((value != REQUEST_TIMEOUT_CFG_FAIL)) {
+      return value;
+    }
+    return DEFAULT_TIMEOUT_MILLIS;
+  }
+
+  /**
+   * Get the configuration value
+   * @param config config parameter
+   * @return long value
+   */
+  private static long getConfigParam(String config, long defaultValue) {
+    DynamicLongProperty dynamicLongProperty = DynamicPropertyFactory.getInstance().getLongProperty(config,
+        defaultValue);
+
+    cfgCallback.computeIfAbsent(config, key -> {
+      dynamicLongProperty.addCallback(() -> {
+        long newValue = dynamicLongProperty.get();
+        String cfgName = dynamicLongProperty.getName();
+
+        //store the value in config center map and check for next requests.
+        configCenterValue.put(cfgName, new AtomicLong(newValue));
+        LOGGER.info("{} changed to {}", cfgName, newValue);
+      });
+      return config;
+    });
+
+    return dynamicLongProperty.get();
+  }
+
+  /**
+   * Get the configuration value
+   * @param config config parameter
+   * @return long value
+   */
+  private static long getConfigValue(String config) {
+    //first need to check in config center map which has high priority.
+    if (configCenterValue.containsKey(config)) {
+      return configCenterValue.get(config).get();
+    }
+
+    return getConfigParam(config, REQUEST_TIMEOUT_CFG_FAIL);
+  }
 }
diff --git a/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java b/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
index 7ddb13e..baf752b 100644
--- a/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
+++ b/core/src/test/java/org/apache/servicecomb/core/transport/TestAbstractTransport.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.core.transport;
 
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Method;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
@@ -32,13 +33,26 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.util.ReflectionUtils;
+
+import com.netflix.config.DynamicProperty;
 
 import mockit.Expectations;
 import mockit.Injectable;
 import mockit.Mocked;
 
 public class TestAbstractTransport {
+  private Method updatePropertyMethod =
+      ReflectionUtils.findMethod(DynamicProperty.class, "updateProperty", String.class, Object.class);
+
+  private void updateProperty(String key, Object value) {
+    updatePropertyMethod.setAccessible(true);
+    ReflectionUtils.invokeMethod(updatePropertyMethod, null, key, value);
+  }
+
   class MyAbstractTransport extends AbstractTransport {
+
     @Override
     public String getName() {
       return "my";
@@ -125,7 +139,7 @@ public class TestAbstractTransport {
     transport.setListenAddressWithoutSchema(null);
     Assert.assertNull(transport.getEndpoint().getEndpoint());
     Assert.assertNull(transport.parseAddress(null));
-    Assert.assertEquals(30000, AbstractTransport.getRequestTimeoutProperty().get());
+    Assert.assertEquals(30000, AbstractTransport.getReqTimeout(Mockito.mock(Invocation.class)));
   }
 
   @Test(expected = NumberFormatException.class)
@@ -134,4 +148,67 @@ public class TestAbstractTransport {
 
     transport.setListenAddressWithoutSchema(":127.0.0.1:9090");
   }
+
+  /**
+   * Tests the request call timeout for service level timeout value
+   */
+  @Test
+  public void testRequestCfgService() throws Exception {
+    System.setProperty("cse.request.timeout.hello1", "3000");
+    Invocation invocation = Mockito.mock(Invocation.class);
+    Mockito.when(invocation.getOperationName()).thenReturn("sayHello1");
+    Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema1");
+    Mockito.when(invocation.getMicroserviceName()).thenReturn("hello1");
+    //check for service level timeout value
+    Assert.assertEquals(3000, AbstractTransport.getReqTimeout(invocation));
+    System.getProperties().remove("cse.request.timeout.hello1");
+  }
+
+  /**
+   * Tests the request call timeout for schema level timeout value
+   */
+  @Test
+  public void testRequestCfgSchema() throws Exception {
+    System.setProperty("cse.request.timeout.hello2.sayHelloSchema2", "2000");
+    Invocation invocation = Mockito.mock(Invocation.class);
+    Mockito.when(invocation.getOperationName()).thenReturn("sayHello2");
+    Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema2");
+    Mockito.when(invocation.getMicroserviceName()).thenReturn("hello2");
+
+    Assert.assertEquals(2000, AbstractTransport.getReqTimeout(invocation));
+    System.getProperties().remove("cse.request.timeout.hello2.sayHelloSchema2");
+  }
+
+  /**
+   * Tests the request call timeout for operatation level timeout value
+   */
+  @Test
+  public void testRequestCfgOperation() throws Exception {
+    System.setProperty("cse.request.timeout.hello3.sayHelloSchema3.sayHello3", "1000");
+    Invocation invocation = Mockito.mock(Invocation.class);
+    Mockito.when(invocation.getOperationName()).thenReturn("sayHello3");
+    Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema3");
+    Mockito.when(invocation.getMicroserviceName()).thenReturn("hello3");
+
+    Assert.assertEquals(1000, AbstractTransport.getReqTimeout(invocation));
+    System.getProperties().remove("cse.request.timeout.hello3.sayHelloSchema3.sayHello3");
+  }
+
+  /**
+   * Tests the request call timeout with configuration change event for operation level config.
+   */
+  @Test
+  public void testRequestTimeoutCfgEvent() {
+    System.setProperty("cse.request.timeout.hello4.sayHelloSchema4.sayHello4", "1000");
+    Invocation invocation = Mockito.mock(Invocation.class);
+    Mockito.when(invocation.getOperationName()).thenReturn("sayHello4");
+    Mockito.when(invocation.getSchemaId()).thenReturn("sayHelloSchema4");
+    Mockito.when(invocation.getMicroserviceName()).thenReturn("hello4");
+    Assert.assertEquals(1000, AbstractTransport.getReqTimeout(invocation));
+
+    updateProperty("cse.request.timeout.hello4.sayHelloSchema4.sayHello4", 2000);
+
+    Assert.assertEquals(2000, AbstractTransport.getReqTimeout(invocation));
+    System.getProperties().remove("cse.request.timeout.hello4.sayHelloSchema4.sayHello4");
+  }
 }
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
index 5e63c2d..57eda68 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/JaxrsClient.java
@@ -24,6 +24,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.servicecomb.common.rest.codec.RestObjectMapper;
+import org.apache.servicecomb.core.Const;
 import org.apache.servicecomb.core.CseContext;
 import org.apache.servicecomb.demo.CodeFirstRestTemplate;
 import org.apache.servicecomb.demo.DemoConst;
@@ -61,6 +62,7 @@ public class JaxrsClient {
     codeFirstClient.testCodeFirst(templateNew, "jaxrs", "/codeFirstJaxrs/");
     testCompute(templateNew);
     testValidator(templateNew);
+    testClientTimeOut(templateNew);
   }
 
   private static void testCompute(RestTemplate template) throws Exception {
@@ -250,4 +252,46 @@ public class JaxrsClient {
     Student result = template.postForObject(cseUrlPrefix + "sayhello", student, Student.class);
     TestMgr.check("hello test 15", result);
   }
+
+  private static void testClientTimeOut(RestTemplate template) throws Exception {
+    String microserviceName = "jaxrs";
+    for (String transport : DemoConst.transports) {
+      if (transport.equals(Const.ANY_TRANSPORT)) {
+        continue;
+      }
+      CseContext.getInstance().getConsumerProviderManager().setTransport(microserviceName, transport);
+      TestMgr.setMsg(microserviceName, transport);
+
+      String cseUrlPrefix = "cse://" + microserviceName + "/clientreqtimeout/";
+
+      testClientTimeoutSayHi(template, cseUrlPrefix);
+      testClientTimeoutAdd(template, cseUrlPrefix);
+    }
+  }
+
+  private static void testClientTimeoutSayHi(RestTemplate template, String cseUrlPrefix) {
+    Student student = new Student();
+    student.setName("timeout");
+    student.setAge(30);
+    Student result = template.postForObject(cseUrlPrefix + "sayhello", student, Student.class);
+    TestMgr.check("hello timeout 30", result);
+  }
+
+  private static void testClientTimeoutAdd(RestTemplate template, String cseUrlPrefix) {
+    Map<String, String> params = new HashMap<>();
+    params.put("a", "5");
+    params.put("b", "20");
+    boolean isExcep = false;
+    try {
+      template.postForObject(cseUrlPrefix + "add", params, Integer.class);
+    } catch (InvocationException e) {
+      isExcep = true;
+      TestMgr.check(490, e.getStatus().getStatusCode());
+      TestMgr.check(
+          "CommonExceptionData [message=Cse Internal Bad Request]",
+          e.getErrorData());
+    }
+
+    TestMgr.check(true, isExcep);
+  }
 }
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
index 3defb01..439a27c 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
@@ -32,3 +32,10 @@ servicecomb:
       name: mycustomrule
     retryEnabled: true
     retryHandler: mycustomhandler
+cse:
+  request:
+    timeout:
+      jaxrs:
+        clientreqtimeout:
+          sayHello: 500
+          add: 500
diff --git a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java
new file mode 100644
index 0000000..83ec4c4
--- /dev/null
+++ b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/RequestClientTimeOut.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.demo.jaxrs.server;
+
+import javax.ws.rs.FormParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.servicecomb.demo.validator.Student;
+import org.apache.servicecomb.provider.rest.common.RestSchema;
+
+
+@RestSchema(schemaId = "clientreqtimeout")
+@Path("/clientreqtimeout")
+@Produces(MediaType.APPLICATION_JSON)
+public class RequestClientTimeOut {
+
+  @Path("/add")
+  @POST
+  public int add(@FormParam("a") int a, @FormParam("b") int b) throws InterruptedException {
+    Thread.sleep(1000);
+    return a + b;
+  }
+
+  @Path("/sayhello")
+  @POST
+  public Student sayHello(Student student) throws InterruptedException {
+    student.setName("hello " + student.getName());
+    student.setAge(student.getAge());
+    return student;
+  }
+}
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
index bef30e2..95611f7 100644
--- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/client/tcp/TcpClientConnection.java
@@ -87,6 +87,10 @@ public class TcpClientConnection extends TcpConnection {
     return localSupportLogin;
   }
 
+  public TcpClientConfig getClientConfig() {
+    return clientConfig;
+  }
+
   public void setLocalSupportLogin(boolean localSupportLogin) {
     this.localSupportLogin = localSupportLogin;
   }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
index 62f5e46..4ce6d2c 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayClient.java
@@ -35,8 +35,6 @@ import org.apache.servicecomb.swagger.invocation.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.netflix.config.DynamicLongProperty;
-
 import io.vertx.core.DeploymentOptions;
 import io.vertx.core.Vertx;
 
@@ -63,13 +61,6 @@ public class HighwayClient {
 
   private TcpClientConfig createTcpClientConfig() {
     TcpClientConfig tcpClientConfig = new TcpClientConfig();
-    DynamicLongProperty prop = AbstractTransport.getRequestTimeoutProperty();
-    prop.addCallback(new Runnable() {
-      public void run() {
-        tcpClientConfig.setRequestTimeoutMillis(prop.get());
-      }
-    });
-    tcpClientConfig.setRequestTimeoutMillis(prop.get());
 
     SSLOptionFactory factory =
         SSLOptionFactory.createSSLOptionFactory(SSL_KEY, null);
@@ -93,7 +84,13 @@ public class HighwayClient {
 
     HighwayClientConnection tcpClient =
         tcpClientPool.findOrCreateClient(invocation.getEndpoint().getEndpoint());
+
+    //set the timeout based on priority. the priority is follows.
+    //high priotiry: 1) operational level 2)schema level 3) service level 4) global level : low priotiry.
+    TcpClientConfig tcpClientConfig = tcpClient.getClientConfig();
+    tcpClientConfig.setRequestTimeoutMillis(AbstractTransport.getReqTimeout(invocation));
     HighwayClientPackage clientPackage = new HighwayClientPackage(invocation, operationProtobuf, tcpClient);
+
     LOGGER.debug("Sending request by highway, qualifiedName={}, endpoint={}.",
         invocation.getMicroserviceQualifiedName(),
         invocation.getEndpoint().getEndpoint());
diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
index 6f9731b..16d06cc 100644
--- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
+++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayClient.java
@@ -83,7 +83,7 @@ public class TestHighwayClient {
 
   @Test
   public void testRequestTimeout() {
-    Assert.assertEquals(AbstractTransport.getRequestTimeoutProperty().get(), 2000);
+    Assert.assertEquals(AbstractTransport.getReqTimeout(invocation), 2000);
   }
 
   @Test
diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
index 180c46f..dcfdb08 100644
--- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
+++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java
@@ -116,7 +116,9 @@ public class RestClientInvocation {
     // 从业务线程转移到网络线程中去发送
     httpClientWithContext.runOnContext(httpClient -> {
       this.setCseContext();
-      clientRequest.setTimeout(AbstractTransport.getRequestTimeoutProperty().get());
+      //set the timeout based on priority. the priority is follows.
+      //high priotiry: 1) operational level 2)schema level 3) service level 4) global level : low priotiry.
+      clientRequest.setTimeout(AbstractTransport.getReqTimeout(invocation));
       try {
         restClientRequest.end();
       } catch (Throwable e) {

-- 
To stop receiving notification emails like this one, please contact
liubao@apache.org.