You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/08/14 06:14:28 UTC
[incubator-servicecomb-java-chassis] 01/02: [SCB-833]Provide a
retry mechanism to meet upgrade no interrupt
This is an automated email from the ASF dual-hosted git repository.
liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 38b48aee22718f851b1a42db109f071542d97e65
Author: liubao <ba...@huawei.com>
AuthorDate: Fri Aug 10 21:37:43 2018 +0800
[SCB-833]Provide a retry mechanism to meet upgrade no interrupt
---
.../org/apache/servicecomb/core/SCBEngine.java | 9 ++-
.../org/apache/servicecomb/core/TestSCBEngine.java | 11 ++--
.../core/provider/consumer/TestInvokerUtils.java | 10 +--
.../client/CodeFirstRestTemplateSpringmvc.java | 19 +++---
.../demo/springmvc/server/CodeFirstSpringmvc.java | 12 ++++
.../loadbalance/DefaultRetryExtensionsFactory.java | 23 ++++++-
.../loadbalance/LoadbalanceHandler.java | 73 ++++++++++++++++++++--
.../loadbalance/TestExtensionsManager.java | 3 +
.../swagger/engine/SwaggerEnvironment.java | 6 +-
9 files changed, 137 insertions(+), 29 deletions(-)
diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
index 6c5ccb3..6e0aefe 100644
--- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
+++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
@@ -20,6 +20,8 @@ import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.ws.rs.core.Response.Status;
+
import org.apache.servicecomb.config.ConfigUtil;
import org.apache.servicecomb.core.BootListener.BootEvent;
import org.apache.servicecomb.core.BootListener.EventType;
@@ -36,6 +38,7 @@ import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.vertx.VertxUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceRegisterTask;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
@@ -277,8 +280,10 @@ public class SCBEngine {
public void ensureStatusUp() {
SCBStatus currentStatus = getStatus();
if (!SCBStatus.UP.equals(currentStatus)) {
- throw new IllegalStateException(
- "The request is rejected, as the service cannot process the request due to STATUS = " + currentStatus);
+ String message =
+ "The request is rejected. Cannot process the request due to STATUS = " + currentStatus;
+ LOGGER.warn(message);
+ throw new InvocationException(Status.SERVICE_UNAVAILABLE, message);
}
}
diff --git a/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java b/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
index 89ffe80..177a46b 100644
--- a/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
+++ b/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
@@ -29,6 +29,7 @@ import org.apache.servicecomb.core.transport.TransportManager;
import org.apache.servicecomb.foundation.vertx.VertxUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.consumer.AppManager;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
@@ -104,9 +105,10 @@ public class TestSCBEngine {
engine.setStatus(SCBStatus.DOWN);
engine.setConsumerProviderManager(consumerProviderManager);
- expectedException.expect(IllegalStateException.class);
+ expectedException.expect(InvocationException.class);
expectedException.expectMessage(
- Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+ Matchers
+ .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
engine.createReferenceConfigForInvoke(null, null, null);
}
@@ -126,9 +128,10 @@ public class TestSCBEngine {
engine.setStatus(SCBStatus.DOWN);
engine.setConsumerProviderManager(consumerProviderManager);
- expectedException.expect(IllegalStateException.class);
+ expectedException.expect(InvocationException.class);
expectedException.expectMessage(
- Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+ Matchers
+ .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
engine.getReferenceConfigForInvoke(null);
}
}
diff --git a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
index ef0b021..04e32b2 100644
--- a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
+++ b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
@@ -124,9 +124,10 @@ public class TestInvokerUtils {
public void testSyncInvoke_4param_NotReady() {
scbEngine.setStatus(SCBStatus.DOWN);
- expectedException.expect(IllegalStateException.class);
+ expectedException.expect(InvocationException.class);
expectedException.expectMessage(
- Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+ Matchers
+ .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
InvokerUtils.syncInvoke("ms", "schemaId", "opName", null);
}
@@ -134,9 +135,10 @@ public class TestInvokerUtils {
public void testSyncInvoke_6param_NotReady() {
scbEngine.setStatus(SCBStatus.DOWN);
- expectedException.expect(IllegalStateException.class);
+ expectedException.expect(InvocationException.class);
expectedException.expectMessage(
- Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+ Matchers
+ .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
InvokerUtils.syncInvoke("ms", "latest", "rest", "schemaId", "opName", null);
}
diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
index 2525f87..af27ccd 100644
--- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
@@ -137,17 +137,13 @@ public class CodeFirstRestTemplateSpringmvc extends CodeFirstRestTemplate {
String result = testRestTemplateUpload(template, cseUrlPrefix, file1, someFile);
TestMgr.check(expect, result);
- {
- MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
- map.add("file1", new FileSystemResource(file1));
-
- result = template.postForObject(
- cseUrlPrefix + "/upload1",
- new HttpEntity<>(map),
- String.class);
+ MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
+ map.add("file1", new FileSystemResource(file1));
- System.out.println(result);
- }
+ result = template.postForObject(
+ cseUrlPrefix + "/upload1",
+ new HttpEntity<>(map),
+ String.class);
result = uploadPartAndFile.fileUpload(new FilePart(null, file1), someFile);
TestMgr.check(expect, result);
@@ -228,6 +224,9 @@ public class CodeFirstRestTemplateSpringmvc extends CodeFirstRestTemplate {
TestMgr.check("h1v " + srcName, responseEntity.getHeaders().getFirst("h1"));
TestMgr.check("h2v " + srcName, responseEntity.getHeaders().getFirst("h2"));
checkStatusCode(microserviceName, 202, responseEntity.getStatusCode());
+
+ int retryResult = template.getForObject(cseUrlPrefix + "retrySuccess?a=2&b=3", Integer.class);
+ TestMgr.check(retryResult, 5);
}
protected void testCodeFirstTestForm(RestTemplate template, String cseUrlPrefix) {
diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
index 01a2158..74f8b11 100644
--- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.Part;
@@ -87,6 +89,8 @@ import io.vertx.core.json.JsonObject;
public class CodeFirstSpringmvc {
private static final Logger LOGGER = LoggerFactory.getLogger(CodeFirstSpringmvc.class);
+ private AtomicInteger firstInovcation = new AtomicInteger(2);
+
private String _fileUpload(MultipartFile file1, Part file2) {
try (InputStream is1 = file1.getInputStream(); InputStream is2 = file2.getInputStream()) {
String content1 = IOUtils.toString(is1);
@@ -104,6 +108,14 @@ public class CodeFirstSpringmvc {
}
}
+ @GetMapping(path = "/retrySuccess")
+ public int retrySuccess(@RequestParam("a") int a, @RequestParam("b") int b) {
+ if (firstInovcation.getAndDecrement() > 0) {
+ throw new InvocationException(Status.SERVICE_UNAVAILABLE, "try again later.");
+ }
+ return a + b;
+ }
+
@PostMapping(path = "/upload1", produces = MediaType.TEXT_PLAIN_VALUE)
public String fileUpload1(@RequestPart(name = "file1") MultipartFile file1) throws IOException {
try (InputStream is = file1.getInputStream()) {
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
index 546c3e3..688d7a2 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
@@ -16,8 +16,13 @@
*/
package org.apache.servicecomb.loadbalance;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
import java.util.Collection;
+import java.util.List;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
@@ -44,9 +49,25 @@ public class DefaultRetryExtensionsFactory implements ExtensionsFactory {
RetryHandler handler = new DefaultLoadBalancerRetryHandler(
Configuration.INSTANCE.getRetryOnSame(microservice),
Configuration.INSTANCE.getRetryOnNext(microservice), true) {
+ private List<Class<? extends Throwable>> retriable = Lists
+ .newArrayList(new Class[] {ConnectException.class, SocketTimeoutException.class, IOException.class});
+
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
- return Utils.isPresentAsCause(e, getRetriableExceptions());
+ boolean retriable = Utils.isPresentAsCause(e, getRetriableExceptions());
+ if (!retriable) {
+ if (e instanceof InvocationException) {
+ if (((InvocationException) e).getStatusCode() == 503) {
+ return true;
+ }
+ }
+ }
+ return retriable;
+ }
+
+ @Override
+ protected List<Class<? extends Throwable>> getRetriableExceptions() {
+ return this.retriable;
}
};
return handler;
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
index 9750f19..7d6394a 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
@@ -44,6 +44,7 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.reactive.ExecutionContext;
@@ -59,6 +60,60 @@ import rx.Observable;
*
*/
public class LoadbalanceHandler implements Handler {
+ // just a wrapper to make sure in retry mode to choose a different server.
+ class RetryLoadBalancer implements ILoadBalancer {
+ // Enough times to make sure to choose a different server in high volume.
+ static final int COUNT = 17;
+
+ Server lastServer = null;
+
+ ILoadBalancer delegate;
+
+ RetryLoadBalancer(ILoadBalancer delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void addServers(List<Server> newServers) {
+ delegate.addServers(newServers);
+ }
+
+ @Override
+ public Server chooseServer(Object key) {
+ for (int i = 0; i < COUNT; i++) {
+ Server s = delegate.chooseServer(key);
+ if (s != null && !s.equals(lastServer)) {
+ lastServer = s;
+ break;
+ }
+ }
+
+ return lastServer;
+ }
+
+
+ @Override
+ public void markServerDown(Server server) {
+ delegate.markServerDown(server);
+ }
+
+ @Override
+ @Deprecated
+ public List<Server> getServerList(boolean availableOnly) {
+ return delegate.getServerList(availableOnly);
+ }
+
+ @Override
+ public List<Server> getReachableServers() {
+ return delegate.getReachableServers();
+ }
+
+ @Override
+ public List<Server> getAllServers() {
+ return delegate.getAllServers();
+ }
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(LoadbalanceHandler.class);
private static final ExecutorService RETRY_POOL = Executors.newCachedThreadPool(new ThreadFactory() {
@@ -190,15 +245,22 @@ public class LoadbalanceHandler implements Handler {
@Override
public void onExceptionWithServer(ExecutionContext<Invocation> context, Throwable exception,
ExecutionInfo info) {
- LOGGER.error("onExceptionWithServer operation {}; msg {}; server {}",
+ LOGGER.error("Invoke server failed. Operation {}; server {}; {}-{} msg {}",
context.getRequest().getInvocationQualifiedName(),
- exception.getMessage(),
- context.getRequest().getEndpoint());
+ context.getRequest().getEndpoint(),
+ info.getNumberOfPastServersAttempted(),
+ info.getNumberOfPastAttemptsOnServer(),
+ exception.getMessage());
}
@Override
public void onExecutionSuccess(ExecutionContext<Invocation> context, Response response,
ExecutionInfo info) {
+ if (info.getNumberOfPastServersAttempted() > 0 || info.getNumberOfPastAttemptsOnServer() > 0) {
+ LOGGER.error("Invoke server success. Operation {}; server {}",
+ context.getRequest().getInvocationQualifiedName(),
+ context.getRequest().getEndpoint());
+ }
if (orginExecutor != null) {
orginExecutor.execute(() -> {
asyncResp.complete(response);
@@ -225,7 +287,7 @@ public class LoadbalanceHandler implements Handler {
ExecutionContext<Invocation> context = new ExecutionContext<>(invocation, null, null, null);
LoadBalancerCommand<Response> command = LoadBalancerCommand.<Response>builder()
- .withLoadBalancer(chosenLB)
+ .withLoadBalancer(new RetryLoadBalancer(chosenLB))
.withServerLocator(invocation)
.withRetryHandler(ExtensionsManager.createRetryHandler(invocation.getMicroserviceName()))
.withListeners(listeners)
@@ -276,7 +338,8 @@ public class LoadbalanceHandler implements Handler {
if (resp.isFailed()) {
if (InvocationException.class.isInstance(resp.getResult())) {
InvocationException e = (InvocationException) resp.getResult();
- return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE;
+ return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE
+ || e.getStatusCode() == 503;
} else {
return true;
}
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
index 539052d..c0070ca 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.servicecomb.loadbalance;
+import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
@@ -117,9 +118,11 @@ public class TestExtensionsManager {
Assert.assertTrue(DefaultLoadBalancerRetryHandler.class.isInstance(retryHandler));
Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), false));
Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), true));
+ Assert.assertTrue(retryHandler.isRetriableException(new InvocationException(503, "", ""), true));
Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), false));
Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), true));
Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), false));
Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), true));
+ Assert.assertTrue(retryHandler.isRetriableException(new IOException(), true));
}
}
diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
index 42852c1..8a5aa2d 100644
--- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
+++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
@@ -171,9 +171,9 @@ public class SwaggerEnvironment {
Method producerMethod = visibleProducerMethods.getOrDefault(methodName, null);
if (producerMethod == null) {
// producer未实现契约,非法
- String msg = String.format("swagger method %s:%s not exist in producer.",
- swaggerIntf.getClass().getName(),
- methodName);
+ String msg = String.format("swagger method %s not exist in producer %s.",
+ methodName,
+ producerInstance.getClass().getName());
throw new Error(msg);
}