You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/24 01:12:55 UTC
[cxf] 01/02: CXF-8597: CXF JAXRS client not closing HTTP
connections (#851)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.4.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 04cce94cfd9835f789016d79a905896027a68a30
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 23 19:42:18 2021 -0400
CXF-8597: CXF JAXRS client not closing HTTP connections (#851)
* CXF-8597: CXF JAXRS client not closing HTTP connections
* Added test case for generic WebClient flow
* Added test case for generic JAXRSClientFactoryBean / proxy flow
(cherry picked from commit d6a449a3a30aeb950b7de567d8d9e8aa5b00da36)
---
.../org/apache/cxf/jaxrs/impl/ResponseImpl.java | 22 ++-----
.../jaxrs/ClientHttpConnectionOutInterceptor.java | 71 ++++++++++++++++++++++
.../jaxrs/JAXRSMultithreadedClientTest.java | 17 ++++++
.../systest/jaxrs/JAXRSRequestDispatcherTest.java | 38 ++++++++++++
4 files changed, 130 insertions(+), 18 deletions(-)
diff --git a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
index b7ba081..fc711f8 100644
--- a/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
+++ b/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/impl/ResponseImpl.java
@@ -55,7 +55,6 @@ import javax.ws.rs.core.Response.Status.Family;
import javax.ws.rs.ext.ReaderInterceptor;
import javax.ws.rs.ext.RuntimeDelegate.HeaderDelegate;
import javax.xml.stream.XMLStreamReader;
-import javax.xml.transform.Source;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.ReaderInputStream;
@@ -405,7 +404,7 @@ public final class ResponseImpl extends Response {
@Override
public <T> T readEntity(Class<T> cls, Annotation[] anns) throws ProcessingException, IllegalStateException {
- return doReadEntity(cls, cls, anns, true);
+ return doReadEntity(cls, cls, anns);
}
@Override
@@ -413,20 +412,13 @@ public final class ResponseImpl extends Response {
public <T> T readEntity(GenericType<T> genType, Annotation[] anns)
throws ProcessingException, IllegalStateException {
return doReadEntity((Class<T>) genType.getRawType(),
- genType.getType(), anns, true);
+ genType.getType(), anns);
}
public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns)
throws ProcessingException, IllegalStateException {
- return doReadEntity(cls, t, anns, false);
- }
-
- public <T> T doReadEntity(Class<T> cls, Type t, Annotation[] anns, boolean closeAfterRead)
- throws ProcessingException, IllegalStateException {
checkEntityIsClosed();
- //according to javadoc, should close when is not buffered.
- boolean shouldClose = !this.entityBufferred;
if (lastEntity != null && cls.isAssignableFrom(lastEntity.getClass())
&& !(lastEntity instanceof InputStream)) {
@@ -476,11 +468,7 @@ public final class ResponseImpl extends Response {
responseMessage);
// close the entity after readEntity is called.
T tCastLastEntity = castLastEntity();
- shouldClose = shouldClose && !(tCastLastEntity instanceof AutoCloseable)
- && !(tCastLastEntity instanceof Source);
- if (closeAfterRead && shouldClose) {
- close();
- }
+ autoClose(cls, false);
return tCastLastEntity;
} catch (NoContentException ex) {
//when content is empty, return null instead of throw exception to pass TCK
@@ -489,9 +477,7 @@ public final class ResponseImpl extends Response {
autoClose(cls, true);
reportMessageHandlerProblem("MSG_READER_PROBLEM", cls, mediaType, ex);
} else {
- if (closeAfterRead && shouldClose) {
- close();
- }
+ autoClose(cls, false);
return null;
}
} catch (Exception ex) {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java
new file mode 100644
index 0000000..66d8413
--- /dev/null
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/ClientHttpConnectionOutInterceptor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.jaxrs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+class ClientHttpConnectionOutInterceptor extends AbstractPhaseInterceptor<Message> {
+ private Collection<HttpURLConnection> connections = new ArrayList<>();
+
+ ClientHttpConnectionOutInterceptor() {
+ super(Phase.SEND_ENDING);
+ }
+
+ @Override
+ public void handleMessage(Message message) throws Fault {
+ final HttpURLConnection connection = (HttpURLConnection) message.get("http.connection");
+ synchronized (connections) {
+ connections.add(connection);
+ }
+ }
+
+ public boolean checkAllClosed() {
+ synchronized (connections) {
+ if (connections.isEmpty()) {
+ return false;
+ }
+
+ return !connections
+ .stream()
+ .anyMatch(this::hasUnclosedInputStream);
+ }
+ }
+
+ private boolean hasUnclosedInputStream(HttpURLConnection connection) {
+ try {
+ final InputStream inputStream = connection.getInputStream();
+ inputStream.read(new byte [] {}); /* 0 bytes to read */
+ return true;
+ } catch (IOException ex) {
+ // The HttpInputStream throws an IOException in case the input stream is already
+ // closed (since we actually read nothing).
+ return !ex.getMessage().equals("stream is closed");
+ }
+ }
+}
\ No newline at end of file
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
index 92376f7..91533e9 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSMultithreadedClientTest.java
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.jaxrs.client.Client;
import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean;
import org.apache.cxf.jaxrs.client.WebClient;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -41,7 +42,9 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBase {
@@ -102,6 +105,20 @@ public class JAXRSMultithreadedClientTest extends AbstractBusClientServerTestBas
runProxies(proxy.echoThroughBookStoreSub(), 10, true, true);
}
+
+ @Test
+ public void testSimpleProxyEnsureResponseStreamIsClosed() throws Exception {
+ final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+ final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean();
+ bean.setAddress("http://localhost:" + PORT);
+ bean.setServiceClass(BookStore.class);
+ bean.getOutInterceptors().add(interceptor);
+
+ final BookStore proxy = bean.create(BookStore.class);
+ runProxies(proxy, 10, true, false);
+
+ assertThat(interceptor.checkAllClosed(), is(true));
+ }
private void runWebClients(WebClient client, int numberOfClients,
boolean threadSafe, boolean stateCanBeChanged) throws Exception {
diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
index 7002a30..5055d9d 100644
--- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
+++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSRequestDispatcherTest.java
@@ -19,6 +19,7 @@
package org.apache.cxf.systest.jaxrs;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -33,8 +34,11 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase {
@@ -60,6 +64,7 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
}
private void doTestGetBookHTML(String endpointAddress) throws Exception {
+
WebClient client = WebClient.create(endpointAddress)
.accept(MediaType.TEXT_HTML);
@@ -143,6 +148,38 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
assertEquals("Welcome", welcome);
}
+ @Test
+ public void testGetBookHTMLFromEnsureResponseStreamIsUnclosed() throws Exception {
+ final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+
+ String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123";
+ WebClient client = WebClient.create(endpointAddress).accept(MediaType.TEXT_HTML);
+ WebClient.getConfig(client).getOutInterceptors().add(interceptor);
+
+ XMLSource source = client.get(XMLSource.class);
+ Map<String, String> namespaces = new HashMap<>();
+ namespaces.put("xhtml", "http://www.w3.org/1999/xhtml");
+ namespaces.put("books", "http://www.w3.org/books");
+ String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces);
+ assertEquals("CXF Rocks", value);
+
+ assertThat(interceptor.checkAllClosed(), is(false));
+ }
+
+ @Test
+ public void testGetBookHTMLFromEnsureResponseStreamIsAutoClosed() throws Exception {
+ final ClientHttpConnectionOutInterceptor interceptor = new ClientHttpConnectionOutInterceptor();
+ final Map<String, Object> properties = Collections.singletonMap("response.stream.auto.close", true);
+
+ String endpointAddress = "http://localhost:" + PORT + "/the/bookstore4/books/html/123";
+ WebClient client = WebClient.create(endpointAddress, properties).accept(MediaType.TEXT_HTML);
+ WebClient.getConfig(client).getOutInterceptors().add(interceptor);
+
+ final String source = client.get(String.class);
+ assertThat(source, containsString("CXF Rocks"));
+ assertThat(interceptor.checkAllClosed(), is(true));
+ }
+
private void doTestGetBookHTMLFromWelcomeList(String address) throws Exception {
WebClient client = WebClient.create(address)
.accept(MediaType.TEXT_HTML);
@@ -154,4 +191,5 @@ public class JAXRSRequestDispatcherTest extends AbstractBusClientServerTestBase
String value = source.getValue("xhtml:html/xhtml:body/xhtml:ul/books:bookTag", namespaces);
assertEquals("Welcome to CXF", value);
}
+
}