You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2023/06/13 17:30:14 UTC

[cxf] branch main updated: [CXF-8885] Make an attempt to get the HttpClient selector threads to shutdown sooner Technically, they should shut themselves down about 3 seconds after a garbage collection assuming the client they are associated with is cleaned up and released. However, if many clients are used, this may be too long and thus we'll make an attempt (via some hacks) to get the thread to shutdown sooner. It's a shame HttpClient doesn't implement Closeable. :(

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/main by this push:
     new 65711680af [CXF-8885] Make an attempt to get the HttpClient selector threads to shutdown sooner Technically, they should shut themselves down about 3 seconds after a garbage collection assuming the client they are associated with is cleaned up and released.   However, if many clients are used, this may be too long and thus we'll make an attempt (via some hacks) to get the thread to shutdown sooner.  It's a shame HttpClient doesn't implement Closeable. :(
65711680af is described below

commit 65711680af99de16f56cbaa819d207edb9428e8a
Author: Daniel Kulp <da...@kulp.com>
AuthorDate: Tue Jun 13 13:27:57 2023 -0400

    [CXF-8885] Make an attempt to get the HttpClient selector threads to shutdown sooner
    Technically, they should shut themselves down about 3 seconds after a garbage collection assuming the client they are associated with is cleaned up and released.   However, if many clients are used, this may be too long and thus we'll make an attempt (via some hacks) to get the thread to shutdown sooner.  It's a shame HttpClient doesn't implement Closeable. :(
---
 .../cxf/transport/http/HttpClientHTTPConduit.java  | 40 ++++++++++
 .../cxf/systest/jaxws/JaxWsClientThreadTest.java   | 86 ++++++++++++++++++++++
 2 files changed, 126 insertions(+)

diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
index 2546e24bac..af7c81b2ec 100644
--- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
+++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PushbackInputStream;
+import java.lang.reflect.InvocationTargetException;
 import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -33,6 +34,7 @@ import java.net.ProxySelector;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLConnection;
 import java.net.UnknownHostException;
 import java.net.http.HttpClient;
 import java.net.http.HttpClient.Redirect;
@@ -71,6 +73,7 @@ import javax.net.ssl.SSLSession;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.util.PropertyUtils;
+import org.apache.cxf.common.util.ReflectionUtil;
 import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.helpers.JavaUtils;
@@ -110,6 +113,43 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit {
                 || lastURL.getPort() != url.getPort();
     }
     
+    /**
+     * Close the conduit
+     */
+    public void close() {
+        if (client != null) {
+            String name = client.toString();
+            client = null;
+            tryToShutdownSelector(name);
+        }
+        defaultAddress = null;
+        super.close();
+    }
+    private synchronized void tryToShutdownSelector(String n) {
+        // it can take three seconds (or more) for the JVM to determine the client
+        // is unreferenced and then shutdown the selector thread, we'll try and speed that
+        // up.  This is somewhat of a complete hack.   
+        int idx = n.lastIndexOf('(');
+        if (idx > 0) {
+            n = n.substring(idx + 1);
+            n = n.substring(0, n.length() - 1);
+            n = "HttpClient-" + n + "-SelectorManager";
+        }
+        try {        
+            ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+            Thread threads[] = new Thread[rootGroup.activeCount()];
+            int cnt = rootGroup.enumerate(threads);
+            for (int x = 0; x < cnt; x++) {
+                if (threads[x].getName().contains(n)) {
+                    threads[x].interrupt();
+                }            
+            }
+        } catch (Throwable t) {
+            //ignore, nothing we can do except wait for the garbage collection
+            //and then the three seconds for the timeout
+        }
+    }
+    
     @Override
     protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy) throws IOException {
         URI uri = address.getURI();
diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
index 921169dbc7..223faba554 100644
--- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
+++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/JaxWsClientThreadTest.java
@@ -220,4 +220,90 @@ public class JaxWsClientThreadTest extends AbstractCXFTest {
                    .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
     }
 
+    @Test
+    public void testMultiGreeterThreadSafety() throws Throwable {
+
+        URL url = getClass().getResource("/wsdl/hello_world.wsdl");
+        final jakarta.xml.ws.Service s = jakarta.xml.ws.Service.create(url, serviceName);
+
+        final int numThreads = 50;
+        final Throwable[] errorHolder = new Throwable[numThreads];
+        
+        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
+        ThreadGroup parentGroup;
+        while ((parentGroup = rootGroup.getParent()) != null) {
+            rootGroup = parentGroup;
+        }
+        int start = rootGroup.activeCount();
+
+        Thread[] threads = new Thread[numThreads];
+        for (int i = 0; i < numThreads; i++) {
+            final int tid = i;
+            Runnable r = new Runnable() {
+                public void run() {
+                    final Greeter greeter = s.getPort(portName, Greeter.class);
+                    try (AutoCloseable c = (AutoCloseable)greeter){
+                        final InvocationHandler handler = Proxy.getInvocationHandler(greeter);
+                        Map<String, Object> requestContext = ((BindingProvider)handler).getRequestContext();                        
+                        
+                        final String protocol = "http-" + Thread.currentThread().getId();
+                        for (int i = 0; i < 10; i++) {
+                            String threadSpecificaddress = protocol + "://localhost:80/" + i;
+                            requestContext.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
+                                               threadSpecificaddress);
+                            assertEquals("we get what we set", threadSpecificaddress, requestContext
+                                         .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
+                            try {
+                                greeter.greetMe("Hi");
+                            } catch (WebServiceException expected) {
+                                //expected.getCause().printStackTrace();
+                                MalformedURLException mue = (MalformedURLException)expected
+                                    .getCause();
+                                if (mue == null || mue.getMessage() == null) {
+                                    throw expected;
+                                }
+                                assertTrue("protocol contains thread id from context", mue.getMessage()
+                                    .indexOf(protocol) != 0);
+                            }
+
+                            requestContext.remove(BindingProvider.ENDPOINT_ADDRESS_PROPERTY);
+                            assertNull("property is null", requestContext
+                                         .get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY));
+
+                        }
+                    } catch (Throwable t) {
+                        // capture assert failures
+                        errorHolder[tid] = t;
+                    }
+                }
+            };
+            threads[i] = new Thread(r);
+        }
+        for (int i = 0; i < numThreads; i++) {
+            threads[i].start();
+        }
+        for (int i = 0; i < numThreads; i++) {
+            threads[i].join();
+        }
+        for (int i = 0; i < numThreads; i++) {
+            if (errorHolder[i] != null) {
+                throw errorHolder[i];
+            }
+        }
+        
+        int end = rootGroup.activeCount();
+        int count = 0;
+        while (end > start && count < 30) {
+            Thread.sleep(100);
+            System.gc();
+            end = rootGroup.activeCount();
+        }
+        
+        
+        System.out.println("Start: " + start + "     End: " + end);
+        // we'll allow a few extra threads to be created for various things like GC, but we definitely shouldn't be anywhere 
+        // near numThreads of extra threads
+        assertTrue("Too many extra trheads created  " + end + "/" + start, (end - start) < 5);
+
+    }
 }