You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by co...@apache.org on 2019/08/27 16:27:48 UTC

[cxf] branch master updated: CXF-8095 - When the workqueue is full for more than asyncExecuteTimeout milliseconds the work is never added. Thanks to Jan Hallonsten for the patch

This is an automated email from the ASF dual-hosted git repository.

coheigea pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acdfbf  CXF-8095 - When the workqueue is full for more than asyncExecuteTimeout milliseconds the work is never added. Thanks to Jan Hallonsten for the patch
8acdfbf is described below

commit 8acdfbfb9ce4847be78e8b1922d91211fada9717
Author: Colm O hEigeartaigh <co...@apache.org>
AuthorDate: Tue Aug 27 17:27:05 2019 +0100

    CXF-8095 - When the workqueue is full for more than asyncExecuteTimeout milliseconds the work is never added. Thanks to Jan Hallonsten for the patch
---
 .../http/asyncclient/AsyncHTTPConduit.java         |  6 ++-
 .../asyncclient/CXFHttpAsyncResponseConsumer.java  |  6 ++-
 .../http/asyncclient/AsyncHTTPConduitTest.java     | 62 ++++++++++++++++++++--
 3 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
index 8c738b7..5922e57 100755
--- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
+++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
@@ -599,10 +599,12 @@ public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
                       callback);
         }
 
-        protected void retrySetHttpResponse(HttpResponse r) {
-            if (httpResponse == null && isAsync) {
+        protected boolean retrySetHttpResponse(HttpResponse r) {
+            if (isAsync) {
                 setHttpResponse(r);
             }
+
+            return !isAsync;
         }
         protected synchronized void setHttpResponse(HttpResponse r) {
             httpResponse = r;
diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
index ae676ed..09af41f 100644
--- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
+++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncResponseConsumer.java
@@ -69,8 +69,10 @@ public class CXFHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<B
 
     @Override
     public void consumeContent(final ContentDecoder dec, final IOControl ioc) throws IOException {
-        outstream.retrySetHttpResponse(response);
-        buf.consumeContent(dec, ioc);
+        // Only consume content when the work was accepted by the work queue
+        if (outstream.retrySetHttpResponse(response)) {
+            buf.consumeContent(dec, ioc);
+        }
     }
 
     @Override
diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
index 9ed69af..7ea4a77 100644
--- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
+++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
@@ -34,12 +34,15 @@ import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.frontend.ClientProxy;
 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.HTTPConduitFactory;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
@@ -154,14 +157,14 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
-    
+
+
     @Test
     public void testTimeoutWithPropertySetting() throws Exception {
         ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
             "3000");
         updateAddressPort(g, PORT);
-        
+
         try {
             assertEquals("Hello " + request, g.greetMeLater(-5000));
             fail();
@@ -183,7 +186,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
+
     @Test
     public void testTimeoutAsyncWithPropertySetting() throws Exception {
         updateAddressPort(g, PORT);
@@ -197,7 +200,7 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
             //expected!!!
         }
     }
-    
+
     @Test
     public void testConnectIssue() throws Exception {
         updateAddressPort(g, PORT_INV);
@@ -282,6 +285,55 @@ public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
     }
 
     @Test
+    public void testCallAsyncWithFullWorkQueue() throws Exception {
+        Bus bus = BusFactory.getThreadDefaultBus();
+        WorkQueueManager workQueueManager = bus.getExtension(WorkQueueManager.class);
+        AutomaticWorkQueueImpl automaticWorkQueue1 = (AutomaticWorkQueueImpl)workQueueManager.getAutomaticWorkQueue();
+        updateAddressPort(g, PORT);
+
+        Client client = ClientProxy.getClient(g);
+        HTTPConduit http = (HTTPConduit) client.getConduit();
+
+        HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
+
+        int asyncExecuteTimeout = 500;
+        httpClientPolicy.setAsyncExecuteTimeout(asyncExecuteTimeout);
+
+        http.setClient(httpClientPolicy);
+
+        long repeat = automaticWorkQueue1.getHighWaterMark() + automaticWorkQueue1.getMaxSize() + 1;
+        CountDownLatch initialThreadsLatch = new CountDownLatch(automaticWorkQueue1.getHighWaterMark());
+        CountDownLatch doneLatch = new CountDownLatch((int) repeat);
+        AtomicInteger threadCount = new AtomicInteger();
+
+        for (long i = 0; i < repeat; i++) {
+            g.greetMeLaterAsync(-50, (res) -> {
+
+                try {
+                    int myCount = threadCount.getAndIncrement();
+
+                    if (myCount < automaticWorkQueue1.getHighWaterMark()) {
+                        // Sleep long enough so that the workqueue will fill up and then
+                        // handleResponseOnWorkqueue will fail for the calls from both responseReceived and consumeContent
+                        Thread.sleep(3 * asyncExecuteTimeout);
+                        initialThreadsLatch.countDown();
+                    } else {
+                        Thread.sleep(50);
+                    }
+                    initialThreadsLatch.await();
+                    doneLatch.countDown();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        doneLatch.await(30, TimeUnit.SECONDS);
+
+        assertEquals("All responses should be handled eventually", 0, doneLatch.getCount());
+    }
+
+
+    @Test
     @Ignore("peformance test")
     public void testCalls() throws Exception {
         updateAddressPort(g, PORT);