You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/08/14 06:14:28 UTC

[incubator-servicecomb-java-chassis] 01/02: [SCB-833]Provide a retry mechanism to meet upgrade no interrupt

This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git

commit 38b48aee22718f851b1a42db109f071542d97e65
Author: liubao <ba...@huawei.com>
AuthorDate: Fri Aug 10 21:37:43 2018 +0800

    [SCB-833]Provide a retry mechanism to meet upgrade no interrupt
---
 .../org/apache/servicecomb/core/SCBEngine.java     |  9 ++-
 .../org/apache/servicecomb/core/TestSCBEngine.java | 11 ++--
 .../core/provider/consumer/TestInvokerUtils.java   | 10 +--
 .../client/CodeFirstRestTemplateSpringmvc.java     | 19 +++---
 .../demo/springmvc/server/CodeFirstSpringmvc.java  | 12 ++++
 .../loadbalance/DefaultRetryExtensionsFactory.java | 23 ++++++-
 .../loadbalance/LoadbalanceHandler.java            | 73 ++++++++++++++++++++--
 .../loadbalance/TestExtensionsManager.java         |  3 +
 .../swagger/engine/SwaggerEnvironment.java         |  6 +-
 9 files changed, 137 insertions(+), 29 deletions(-)

diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
index 6c5ccb3..6e0aefe 100644
--- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
+++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
@@ -20,6 +20,8 @@ import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.servicecomb.config.ConfigUtil;
 import org.apache.servicecomb.core.BootListener.BootEvent;
 import org.apache.servicecomb.core.BootListener.EventType;
@@ -36,6 +38,7 @@ import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.vertx.VertxUtils;
 import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceRegisterTask;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
@@ -277,8 +280,10 @@ public class SCBEngine {
   public void ensureStatusUp() {
     SCBStatus currentStatus = getStatus();
     if (!SCBStatus.UP.equals(currentStatus)) {
-      throw new IllegalStateException(
-          "The request is rejected, as the service cannot process the request due to STATUS = " + currentStatus);
+      String message =
+          "The request is rejected. Cannot process the request due to STATUS = " + currentStatus;
+      LOGGER.warn(message);
+      throw new InvocationException(Status.SERVICE_UNAVAILABLE, message);
     }
   }
 
diff --git a/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java b/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
index 89ffe80..177a46b 100644
--- a/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
+++ b/core/src/test/java/org/apache/servicecomb/core/TestSCBEngine.java
@@ -29,6 +29,7 @@ import org.apache.servicecomb.core.transport.TransportManager;
 import org.apache.servicecomb.foundation.vertx.VertxUtils;
 import org.apache.servicecomb.serviceregistry.RegistryUtils;
 import org.apache.servicecomb.serviceregistry.consumer.AppManager;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -104,9 +105,10 @@ public class TestSCBEngine {
     engine.setStatus(SCBStatus.DOWN);
     engine.setConsumerProviderManager(consumerProviderManager);
 
-    expectedException.expect(IllegalStateException.class);
+    expectedException.expect(InvocationException.class);
     expectedException.expectMessage(
-        Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+        Matchers
+            .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
     engine.createReferenceConfigForInvoke(null, null, null);
   }
 
@@ -126,9 +128,10 @@ public class TestSCBEngine {
     engine.setStatus(SCBStatus.DOWN);
     engine.setConsumerProviderManager(consumerProviderManager);
 
-    expectedException.expect(IllegalStateException.class);
+    expectedException.expect(InvocationException.class);
     expectedException.expectMessage(
-        Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+        Matchers
+            .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
     engine.getReferenceConfigForInvoke(null);
   }
 }
diff --git a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
index ef0b021..04e32b2 100644
--- a/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
+++ b/core/src/test/java/org/apache/servicecomb/core/provider/consumer/TestInvokerUtils.java
@@ -124,9 +124,10 @@ public class TestInvokerUtils {
   public void testSyncInvoke_4param_NotReady() {
     scbEngine.setStatus(SCBStatus.DOWN);
 
-    expectedException.expect(IllegalStateException.class);
+    expectedException.expect(InvocationException.class);
     expectedException.expectMessage(
-        Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+        Matchers
+            .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
     InvokerUtils.syncInvoke("ms", "schemaId", "opName", null);
   }
 
@@ -134,9 +135,10 @@ public class TestInvokerUtils {
   public void testSyncInvoke_6param_NotReady() {
     scbEngine.setStatus(SCBStatus.DOWN);
 
-    expectedException.expect(IllegalStateException.class);
+    expectedException.expect(InvocationException.class);
     expectedException.expectMessage(
-        Matchers.is("The request is rejected, as the service cannot process the request due to STATUS = DOWN"));
+        Matchers
+            .is("InvocationException: code=503;msg=CommonExceptionData [message=The request is rejected. Cannot process the request due to STATUS = DOWN]"));
 
     InvokerUtils.syncInvoke("ms", "latest", "rest", "schemaId", "opName", null);
   }
diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
index 2525f87..af27ccd 100644
--- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
@@ -137,17 +137,13 @@ public class CodeFirstRestTemplateSpringmvc extends CodeFirstRestTemplate {
     String result = testRestTemplateUpload(template, cseUrlPrefix, file1, someFile);
     TestMgr.check(expect, result);
 
-    {
-      MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
-      map.add("file1", new FileSystemResource(file1));
-
-      result = template.postForObject(
-          cseUrlPrefix + "/upload1",
-          new HttpEntity<>(map),
-          String.class);
+    MultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
+    map.add("file1", new FileSystemResource(file1));
 
-      System.out.println(result);
-    }
+    result = template.postForObject(
+        cseUrlPrefix + "/upload1",
+        new HttpEntity<>(map),
+        String.class);
 
     result = uploadPartAndFile.fileUpload(new FilePart(null, file1), someFile);
     TestMgr.check(expect, result);
@@ -228,6 +224,9 @@ public class CodeFirstRestTemplateSpringmvc extends CodeFirstRestTemplate {
     TestMgr.check("h1v " + srcName, responseEntity.getHeaders().getFirst("h1"));
     TestMgr.check("h2v " + srcName, responseEntity.getHeaders().getFirst("h2"));
     checkStatusCode(microserviceName, 202, responseEntity.getStatusCode());
+
+    int retryResult = template.getForObject(cseUrlPrefix + "retrySuccess?a=2&b=3", Integer.class);
+    TestMgr.check(retryResult, 5);
   }
 
   protected void testCodeFirstTestForm(RestTemplate template, String cseUrlPrefix) {
diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
index 01a2158..74f8b11 100644
--- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
@@ -22,6 +22,8 @@ import java.io.InputStream;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.Part;
@@ -87,6 +89,8 @@ import io.vertx.core.json.JsonObject;
 public class CodeFirstSpringmvc {
   private static final Logger LOGGER = LoggerFactory.getLogger(CodeFirstSpringmvc.class);
 
+  private AtomicInteger firstInovcation = new AtomicInteger(2);
+
   private String _fileUpload(MultipartFile file1, Part file2) {
     try (InputStream is1 = file1.getInputStream(); InputStream is2 = file2.getInputStream()) {
       String content1 = IOUtils.toString(is1);
@@ -104,6 +108,14 @@ public class CodeFirstSpringmvc {
     }
   }
 
+  @GetMapping(path = "/retrySuccess")
+  public int retrySuccess(@RequestParam("a") int a, @RequestParam("b") int b) {
+    if (firstInovcation.getAndDecrement() > 0) {
+      throw new InvocationException(Status.SERVICE_UNAVAILABLE, "try again later.");
+    }
+    return a + b;
+  }
+
   @PostMapping(path = "/upload1", produces = MediaType.TEXT_PLAIN_VALUE)
   public String fileUpload1(@RequestPart(name = "file1") MultipartFile file1) throws IOException {
     try (InputStream is = file1.getInputStream()) {
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
index 546c3e3..688d7a2 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
@@ -16,8 +16,13 @@
  */
 package org.apache.servicecomb.loadbalance;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
 import java.util.Collection;
+import java.util.List;
 
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.springframework.stereotype.Component;
 
 import com.google.common.collect.Lists;
@@ -44,9 +49,25 @@ public class DefaultRetryExtensionsFactory implements ExtensionsFactory {
     RetryHandler handler = new DefaultLoadBalancerRetryHandler(
         Configuration.INSTANCE.getRetryOnSame(microservice),
         Configuration.INSTANCE.getRetryOnNext(microservice), true) {
+      private List<Class<? extends Throwable>> retriable = Lists
+          .newArrayList(new Class[] {ConnectException.class, SocketTimeoutException.class, IOException.class});
+
       @Override
       public boolean isRetriableException(Throwable e, boolean sameServer) {
-        return Utils.isPresentAsCause(e, getRetriableExceptions());
+        boolean retriable = Utils.isPresentAsCause(e, getRetriableExceptions());
+        if (!retriable) {
+          if (e instanceof InvocationException) {
+            if (((InvocationException) e).getStatusCode() == 503) {
+              return true;
+            }
+          }
+        }
+        return retriable;
+      }
+
+      @Override
+      protected List<Class<? extends Throwable>> getRetriableExceptions() {
+        return this.retriable;
       }
     };
     return handler;
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
index 9750f19..7d6394a 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
@@ -44,6 +44,7 @@ import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.netflix.loadbalancer.ILoadBalancer;
 import com.netflix.loadbalancer.IRule;
 import com.netflix.loadbalancer.Server;
 import com.netflix.loadbalancer.reactive.ExecutionContext;
@@ -59,6 +60,60 @@ import rx.Observable;
  *
  */
 public class LoadbalanceHandler implements Handler {
+  // just a wrapper to make sure in retry mode to choose a different server.
+  class RetryLoadBalancer implements ILoadBalancer {
+    // Enough times to make sure to choose a different server in high volume.
+    static final int COUNT = 17;
+
+    Server lastServer = null;
+
+    ILoadBalancer delegate;
+
+    RetryLoadBalancer(ILoadBalancer delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public void addServers(List<Server> newServers) {
+      delegate.addServers(newServers);
+    }
+
+    @Override
+    public Server chooseServer(Object key) {
+      for (int i = 0; i < COUNT; i++) {
+        Server s = delegate.chooseServer(key);
+        if (s != null && !s.equals(lastServer)) {
+          lastServer = s;
+          break;
+        }
+      }
+
+      return lastServer;
+    }
+
+
+    @Override
+    public void markServerDown(Server server) {
+      delegate.markServerDown(server);
+    }
+
+    @Override
+    @Deprecated
+    public List<Server> getServerList(boolean availableOnly) {
+      return delegate.getServerList(availableOnly);
+    }
+
+    @Override
+    public List<Server> getReachableServers() {
+      return delegate.getReachableServers();
+    }
+
+    @Override
+    public List<Server> getAllServers() {
+      return delegate.getAllServers();
+    }
+  }
+
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadbalanceHandler.class);
 
   private static final ExecutorService RETRY_POOL = Executors.newCachedThreadPool(new ThreadFactory() {
@@ -190,15 +245,22 @@ public class LoadbalanceHandler implements Handler {
       @Override
       public void onExceptionWithServer(ExecutionContext<Invocation> context, Throwable exception,
           ExecutionInfo info) {
-        LOGGER.error("onExceptionWithServer operation {}; msg {}; server {}",
+        LOGGER.error("Invoke server failed. Operation {}; server {}; {}-{} msg {}",
             context.getRequest().getInvocationQualifiedName(),
-            exception.getMessage(),
-            context.getRequest().getEndpoint());
+            context.getRequest().getEndpoint(),
+            info.getNumberOfPastServersAttempted(),
+            info.getNumberOfPastAttemptsOnServer(),
+            exception.getMessage());
       }
 
       @Override
       public void onExecutionSuccess(ExecutionContext<Invocation> context, Response response,
           ExecutionInfo info) {
+        if (info.getNumberOfPastServersAttempted() > 0 || info.getNumberOfPastAttemptsOnServer() > 0) {
+          LOGGER.error("Invoke server success. Operation {}; server {}",
+              context.getRequest().getInvocationQualifiedName(),
+              context.getRequest().getEndpoint());
+        }
         if (orginExecutor != null) {
           orginExecutor.execute(() -> {
             asyncResp.complete(response);
@@ -225,7 +287,7 @@ public class LoadbalanceHandler implements Handler {
     ExecutionContext<Invocation> context = new ExecutionContext<>(invocation, null, null, null);
 
     LoadBalancerCommand<Response> command = LoadBalancerCommand.<Response>builder()
-        .withLoadBalancer(chosenLB)
+        .withLoadBalancer(new RetryLoadBalancer(chosenLB))
         .withServerLocator(invocation)
         .withRetryHandler(ExtensionsManager.createRetryHandler(invocation.getMicroserviceName()))
         .withListeners(listeners)
@@ -276,7 +338,8 @@ public class LoadbalanceHandler implements Handler {
     if (resp.isFailed()) {
       if (InvocationException.class.isInstance(resp.getResult())) {
         InvocationException e = (InvocationException) resp.getResult();
-        return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE;
+        return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE
+            || e.getStatusCode() == 503;
       } else {
         return true;
       }
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
index 539052d..c0070ca 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.servicecomb.loadbalance;
 
+import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
@@ -117,9 +118,11 @@ public class TestExtensionsManager {
     Assert.assertTrue(DefaultLoadBalancerRetryHandler.class.isInstance(retryHandler));
     Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), false));
     Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), true));
+    Assert.assertTrue(retryHandler.isRetriableException(new InvocationException(503, "", ""), true));
     Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), false));
     Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), true));
     Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), false));
     Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), true));
+    Assert.assertTrue(retryHandler.isRetriableException(new IOException(), true));
   }
 }
diff --git a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
index 42852c1..8a5aa2d 100644
--- a/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
+++ b/swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/engine/SwaggerEnvironment.java
@@ -171,9 +171,9 @@ public class SwaggerEnvironment {
       Method producerMethod = visibleProducerMethods.getOrDefault(methodName, null);
       if (producerMethod == null) {
         // producer未实现契约,非法
-        String msg = String.format("swagger method %s:%s not exist in producer.",
-            swaggerIntf.getClass().getName(),
-            methodName);
+        String msg = String.format("swagger method %s not exist in producer %s.",
+            methodName,
+            producerInstance.getClass().getName());
         throw new Error(msg);
       }