You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2019/04/01 14:33:33 UTC

[servicecomb-java-chassis] branch master updated: [SCB-1226] there are problems when request rejected by thread pool queue full

This is an automated email from the ASF dual-hosted git repository.

wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d53ef2  [SCB-1226] there are problems when request rejected by thread pool queue full
6d53ef2 is described below

commit 6d53ef26742f9031a4bda26b6b240e7715981f53
Author: wujimin <wu...@huawei.com>
AuthorDate: Mon Apr 1 15:58:35 2019 +0800

    [SCB-1226] there are problems when request rejected by thread pool queue full
---
 .../common/rest/AbstractRestInvocation.java        | 49 ++++++++++++----------
 .../common/rest/TestAbstractRestInvocation.java    | 32 +++++++++++++-
 .../org/apache/servicecomb/core/SCBEngine.java     |  8 +++-
 .../transport/highway/HighwayServerInvoke.java     |  6 ++-
 4 files changed, 69 insertions(+), 26 deletions(-)

diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
index f80309d..22cbc53 100644
--- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
+++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/AbstractRestInvocation.java
@@ -119,7 +119,7 @@ public abstract class AbstractRestInvocation {
   protected void scheduleInvocation() {
     try {
       createInvocation();
-    } catch (IllegalStateException e) {
+    } catch (Throwable e) {
       sendFailResponse(e);
       return;
     }
@@ -141,28 +141,33 @@ public abstract class AbstractRestInvocation {
       return;
     }
 
-    operationMeta.getExecutor().execute(() -> {
-      synchronized (this.requestEx) {
-        try {
-          if (isInQueueTimeout()) {
-            throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
-          }
-          if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
-            // already timeout
-            // in this time, request maybe recycled and reused by web container, do not use requestEx
-            LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
-                operationMeta.getHttpMethod(),
-                operationMeta.getMicroserviceQualifiedName());
-            return;
+    try {
+      operationMeta.getExecutor().execute(() -> {
+        synchronized (this.requestEx) {
+          try {
+            if (isInQueueTimeout()) {
+              throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
+            }
+            if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
+              // already timeout
+              // in this time, request maybe recycled and reused by web container, do not use requestEx
+              LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
+                  operationMeta.getHttpMethod(),
+                  operationMeta.getMicroserviceQualifiedName());
+              return;
+            }
+
+            runOnExecutor();
+          } catch (Throwable e) {
+            LOGGER.error("rest server onRequest error", e);
+            sendFailResponse(e);
           }
-
-          runOnExecutor();
-        } catch (Throwable e) {
-          LOGGER.error("rest server onRequest error", e);
-          sendFailResponse(e);
         }
-      }
-    });
+      });
+    } catch (Throwable e) {
+      LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
+      sendFailResponse(e);
+    }
   }
 
   private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
@@ -176,7 +181,7 @@ public abstract class AbstractRestInvocation {
           produceProcessor = ProduceProcessorManager.JSON_PROCESSOR;
           sendResponse(response);
         });
-      } catch (Exception e) {
+      } catch (Throwable e) {
         LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
         qpsFlowControlReject.value = true;
         sendFailResponse(e);
diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
index 7e9c9ac..4e82258 100644
--- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
+++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/TestAbstractRestInvocation.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.ws.rs.core.Response.Status;
 import javax.xml.ws.Holder;
@@ -885,17 +886,44 @@ public class TestAbstractRestInvocation {
       protected void runOnExecutor() {
         throw new RuntimeExceptionWithoutStackTrace("run on executor");
       }
+    };
+    restInvocation.requestEx = requestEx;
+    restInvocation.restOperationMeta = restOperation;
+
+    // will not throw exception
+    restInvocation.scheduleInvocation();
+  }
+
+  @Test
+  public void threadPoolReject(@Mocked OperationMeta operationMeta) {
+    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("reject");
+    Executor executor = (task) -> {
+      throw rejectedExecutionException;
+    };
 
+    new Expectations() {
+      {
+        restOperation.getOperationMeta();
+        result = operationMeta;
+        operationMeta.getExecutor();
+        result = executor;
+      }
+    };
+
+    Holder<Throwable> holder = new Holder<>();
+    requestEx = new AbstractHttpServletRequestForTest();
+    restInvocation = new AbstractRestInvocationForTest() {
       @Override
       public void sendFailResponse(Throwable throwable) {
-        throw (Error) throwable;
+        holder.value = throwable;
       }
     };
     restInvocation.requestEx = requestEx;
     restInvocation.restOperationMeta = restOperation;
 
-    // will not throw exception
     restInvocation.scheduleInvocation();
+
+    Assert.assertSame(rejectedExecutionException, holder.value);
   }
 
   @Test
diff --git a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
index 2aabd84..3f0b232 100644
--- a/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
+++ b/core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
@@ -298,10 +298,16 @@ public class SCBEngine {
   }
 
   private void validAllInvocationFinished() throws InterruptedException {
+    long start = System.currentTimeMillis();
     while (true) {
-      if (invocationFinishedCounter.get() == invocationStartedCounter.get()) {
+      long remaining = invocationStartedCounter.get() - invocationFinishedCounter.get();
+      if (remaining == 0) {
         return;
       }
+
+      if (System.currentTimeMillis() - start > TimeUnit.SECONDS.toMillis(30)) {
+        LOGGER.error("wait for all requests timeout, abandon waiting, remaining requests: {}.", remaining);
+      }
       TimeUnit.SECONDS.sleep(1);
     }
   }
diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
index 0c49723..727eaef 100644
--- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
+++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerInvoke.java
@@ -18,6 +18,7 @@
 package org.apache.servicecomb.transport.highway;
 
 import java.util.Map;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.ws.rs.core.Response.Status;
 import javax.xml.ws.Holder;
@@ -186,7 +187,10 @@ public class HighwayServerInvoke {
       }
 
       operationMeta.getExecutor().execute(this::runInExecutor);
-    } catch (IllegalStateException e) {
+    } catch (Throwable e) {
+      if (e instanceof RejectedExecutionException) {
+        LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
+      }
       sendResponse(header.getContext(), Response.providerFailResp(e));
     }
   }