You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/24 01:12:55 UTC

[cxf] 01/02: CXF-8597: CXF JAXRS client not closing HTTP connections (#851)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.4.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 04cce94cfd9835f789016d79a905896027a68a30
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 23 19:42:18 2021 -0400

    CXF-8597: CXF JAXRS client not closing HTTP connections (#851)
    
    * CXF-8597: CXF JAXRS client not closing HTTP connections
    
    * Added test case for generic WebClient flow
    
    * Added test case for generic JAXRSClientFactoryBean / proxy flow
    
    (cherry picked from commit d6a449a3a30aeb950b7de567d8d9e8aa5b00da36)
---
 .../org/apache/cxf/jaxrs/impl/ResponseImpl.java    | 22 ++-----
 .../jaxrs/ClientHttpConnectionOutInterceptor.java  | 71 ++++++++++++++++++++++
 .../jaxrs/JAXRSMultithreadedClientTest.java        | 17 ++++++
 .../systest/jaxrs/JAXRSRequestDispatcherTest.java  | 38 ++++++++++++
 4 files changed, 130 insertions(+), 18 deletions(-)

diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
index b7ba081..fc711f8 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
@@ -55,7 +55,6 @@ import javax.ws.rs.core.Response.Status.Family;
 import javax.ws.rs.ext.ReaderInterceptor;
 import javax.ws.rs.ext.RuntimeDelegate.HeaderDelegate;
 import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.Source;
 
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.io.ReaderInputStream;
@@ -405,7 +404,7 @@ public final class ResponseImpl extends Response {
 
     @Override
     public <T> T readEntity(Class<T> cls, Annotation[] anns) throws ProcessingException, IllegalStateException {
-        return doReadEntity(cls, cls, anns, true);
+        return doReadEntity(cls, cls, anns);
     }
 
     @Override
@@ -413,20 +412,13 @@ public final class ResponseImpl extends Response {
     public <T> T readEntity(GenericType<T> genType, Annotation[] anns)
         throws ProcessingException, IllegalStateException {
         return doReadEntity((Class<T>) genType.getRawType(),
-                            genType.getType(), anns, true);
+                            genType.getType(), anns);
     }
 
     public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns)
         throws ProcessingException, IllegalStateException {
-        return doReadEntity(cls, t, anns, false);
-    }
-
-    public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns, boolean closeAfterRead)
-        throws ProcessingException, IllegalStateException {
 
         checkEntityIsClosed();
-        //according to javadoc, should close when is not buffered.
-        boolean shouldClose = !this.entityBufferred;
 
         if (lastEntity != null && cls.isAssignableFrom(lastEntity.getClass())
             && !(lastEntity instanceof InputStream)) {
@@ -476,11 +468,7 @@ public final class ResponseImpl extends Response {
                                                                   responseMessage);
                 // close the entity after readEntity is called.
                 T tCastLastEntity = castLastEntity();
-                shouldClose = shouldClose && !(tCastLastEntity instanceof AutoCloseable)
-                    && !(tCastLastEntity instanceof Source);
-                if (closeAfterRead && shouldClose) {
-                    close();
-                }
+                autoClose(cls, false);
                 return tCastLastEntity;
             } catch (NoContentException ex) {
                 //when content is empty, return null instead of throw exception to pass TCK
@@ -489,9 +477,7 @@ public final class ResponseImpl extends Response {
                     autoClose(cls, true);
                     reportMessageHandlerProblem("MSG_READER_PROBLEM", cls, mediaType, ex);
                 } else {
-                    if (closeAfterRead && shouldClose) {
-                        close();
-                    }
+                    autoClose(cls, false);
                     return null;
                 }
             } catch (Exception ex) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java
new file mode 100644
index 0000000..66d8413
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+class ClientHttpConnectionOutInterceptor extends AbstractPhaseInterceptor<Message> {
+    private Collection<HttpURLConnection> connections = new ArrayList<>();
+
+    ClientHttpConnectionOutInterceptor() {
+        super(Phase.SEND_ENDING);
+    }
+
+    @Override
+    public void handleMessage(Message message) throws Fault {
+        final HttpURLConnection connection = (HttpURLConnection) message.get("http.connection");
+        synchronized (connections) {
+            connections.add(connection);
+        }
+    }
+
+    public boolean checkAllClosed() {
+        synchronized (connections) {
+            if (connections.isEmpty()) {
+                return false;
+            }
+            
+            return !connections
+                .stream()
+                .anyMatch(this::hasUnclosedInputStream);
+        }
+    }
+    
+    private boolean hasUnclosedInputStream(HttpURLConnection connection) {
+        try {
+            final InputStream inputStream = connection.getInputStream();
+            inputStream.read(new byte [] {}); /* 0 bytes to read */
+            return true;
+        } catch (IOException ex) {
+            // The HttpInputStream throws an IOException in case the input stream is already
+            // closed (since we actually read nothing).
+            return !ex.getMessage().equals("stream is closed");
+        }
+    }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
index 92376f7..91533e9 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxrs.client.Client;
 import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
 import org.apache.cxf.jaxrs.client.WebClient;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
@@ -41,7 +42,9 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBase {
@@ -102,6 +105,20 @@ public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBas
 
         runProxies(proxy.echoThroughBookStoreSub(), 10, true, true);
     }
+    
+    @Test
+    public void testSimpleProxyEnsureResponseStreamIsClosed() throws Exception {
+        final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+        final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+        bean.setAddress("http://localhost:" + PORT);
+        bean.setServiceClass(BookStore.class);
+        bean.getOutInterceptors().add(interceptor);
+        
+        final BookStore proxy = bean.create(BookStore.class);
+        runProxies(proxy, 10, true, false);
+        
+        assertThat(interceptor.checkAllClosed(), is(true));
+    }
 
     private void runWebClients(WebClient client, int numberOfClients,
         boolean threadSafe, boolean stateCanBeChanged) throws Exception {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
index 7002a30..5055d9d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.systest.jaxrs;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -33,8 +34,11 @@ import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase {
@@ -60,6 +64,7 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
     }
 
     private void doTestGetBookHTML(String endpointAddress) throws Exception {
+
         WebClient client = WebClient.create(endpointAddress)
             .accept(MediaType.TEXT_HTML);
 
@@ -143,6 +148,38 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
         assertEquals("Welcome", welcome);
     }
 
+    @Test
+    public void testGetBookHTMLFromEnsureResponseStreamIsUnclosed() throws Exception {
+        final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+
+        String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123";
+        WebClient client = WebClient.create(endpointAddress).accept(MediaType.TEXT_HTML);
+        WebClient.getConfig(client).getOutInterceptors().add(interceptor);
+
+        XMLSource source = client.get(XMLSource.class);
+        Map<String, String> namespaces = new HashMap<>();
+        namespaces.put("xhtml", "http://www.w3.org/1999/xhtml");
+        namespaces.put("books", "http://www.w3.org/books");
+        String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces);
+        assertEquals("CXF Rocks", value);
+        
+        assertThat(interceptor.checkAllClosed(), is(false));
+    }
+    
+    @Test
+    public void testGetBookHTMLFromEnsureResponseStreamIsAutoClosed() throws Exception {
+        final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+        final Map<String, Object> properties = Collections.singletonMap("response.stream.auto.close", true);
+
+        String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123";
+        WebClient client = WebClient.create(endpointAddress, properties).accept(MediaType.TEXT_HTML);
+        WebClient.getConfig(client).getOutInterceptors().add(interceptor);
+
+        final String source = client.get(String.class);
+        assertThat(source, containsString("CXF Rocks"));
+        assertThat(interceptor.checkAllClosed(), is(true));
+    }
+    
     private void doTestGetBookHTMLFromWelcomeList(String address) throws Exception {
         WebClient client = WebClient.create(address)
             .accept(MediaType.TEXT_HTML);
@@ -154,4 +191,5 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
         String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces);
         assertEquals("Welcome to CXF", value);
     }
+    
 }