You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/11/13 15:57:47 UTC
[cxf] branch master updated: CXF-8221: Upgrade Http Components Core and Client to 5.0 (#870)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 910168d CXF-8221: Upgrade Http Components Core and Client to 5.0 (#870)
910168d is described below
commit 910168db187d4e34ef3228de7ca178e789cefd5f
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Sat Nov 13 10:57:36 2021 -0500
CXF-8221: Upgrade Http Components Core and Client to 5.0 (#870)
---
bom/pom.xml | 5 +
parent/pom.xml | 6 +
rt/transports/http-hc/pom.xml | 5 -
rt/transports/{http-hc => http-hc5}/pom.xml | 39 +-
.../http/asyncclient/hc5/AnyAuthScope.java | 33 +
.../http/asyncclient/hc5/AsyncHTTPConduit.java | 979 +++++++++++++++++++++
.../asyncclient/hc5/AsyncHTTPConduitFactory.java | 398 +++++++++
.../asyncclient/hc5/AsyncHttpTransportFactory.java | 136 +++
.../hc5/CXFHttpAsyncRequestProducer.java | 148 ++++
.../hc5/CXFHttpAsyncResponseConsumer.java | 112 +++
.../http/asyncclient/hc5/CXFHttpRequest.java | 74 ++
.../http/asyncclient/hc5/CXFResponseCallback.java | 26 +
.../http/asyncclient/hc5/MutableHttpEntity.java | 120 +++
.../http/asyncclient/hc5/SharedInputBuffer.java | 279 ++++++
.../http/asyncclient/hc5/SharedOutputBuffer.java | 327 +++++++
.../main/resources/META-INF/cxf/bus-extensions.txt | 3 +
.../http/asyncclient/hc5/AsyncHTTPConduitTest.java | 334 +++++++
rt/transports/pom.xml | 1 +
systests/pom.xml | 1 +
systests/transport-hc5/pom.xml | 163 ++++
.../org/apache/cxf/systest/hc5/jaxrs/Book.java | 116 +++
.../systest/hc5/jaxrs/BookServerAsyncClient.java | 107 +++
.../apache/cxf/systest/hc5/jaxrs/BookStore.java | 230 +++++
.../org/apache/cxf/systest/hc5/jaxrs/Chapter.java | 106 +++
.../systest/hc5/jaxrs/JAXRSAsyncClientTest.java | 638 ++++++++++++++
.../org/apache/cxf/systest/hc5/jaxrs/RETRIEVE.java | 33 +
.../systest/hc5/jaxws/JAXWSAsyncClientTest.java | 140 +++
27 files changed, 4519 insertions(+), 40 deletions(-)
diff --git a/bom/pom.xml b/bom/pom.xml
index 95938a1..9f768e2 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -301,6 +301,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-hc5</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/parent/pom.xml b/parent/pom.xml
index fd2e489..978bc2c 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -125,6 +125,7 @@
<cxf.httpcomponents.client.version>4.5.13</cxf.httpcomponents.client.version>
<cxf.httpcomponents.core.version.range>[4.3,4.5.0)</cxf.httpcomponents.core.version.range>
<cxf.httpcomponents.core.version>4.4.14</cxf.httpcomponents.core.version>
+ <cxf.httpcomponents.client5.version>5.1.1</cxf.httpcomponents.client5.version>
<cxf.jackson.version>2.12.5</cxf.jackson.version>
<cxf.jackson.databind.version>2.12.5</cxf.jackson.databind.version>
<cxf.jacorb.version>3.9</cxf.jacorb.version>
@@ -944,6 +945,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents.client5</groupId>
+ <artifactId>httpclient5</artifactId>
+ <version>${cxf.httpcomponents.client5.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
<scope>provided</scope>
diff --git a/rt/transports/http-hc/pom.xml b/rt/transports/http-hc/pom.xml
index ea83724..779cbd5 100644
--- a/rt/transports/http-hc/pom.xml
+++ b/rt/transports/http-hc/pom.xml
@@ -45,11 +45,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
<scope>provided</scope>
diff --git a/rt/transports/http-hc/pom.xml b/rt/transports/http-hc5/pom.xml
similarity index 67%
copy from rt/transports/http-hc/pom.xml
copy to rt/transports/http-hc5/pom.xml
index ea83724..0c1c386 100644
--- a/rt/transports/http-hc/pom.xml
+++ b/rt/transports/http-hc5/pom.xml
@@ -19,7 +19,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <artifactId>cxf-rt-transports-http-hc</artifactId>
+ <artifactId>cxf-rt-transports-http-hc5</artifactId>
<packaging>bundle</packaging>
<name>Apache CXF Runtime HTTP Async Transport</name>
<description>Apache CXF Runtime HTTP Async Transport</description>
@@ -31,12 +31,7 @@
<relativePath>../../../parent/pom.xml</relativePath>
</parent>
<properties>
- <cxf.module.name>org.apache.cxf.transport.http.hc</cxf.module.name>
- <cxf.bundle.activator>org.apache.cxf.transport.http.asyncclient.Activator</cxf.bundle.activator>
- <cxf.osgi.import>
- javax.annotation;version="${cxf.osgi.javax.annotation.version}",
- *
- </cxf.osgi.import>
+ <cxf.module.name>org.apache.cxf.transport.http.hc5</cxf.module.name>
</properties>
<dependencies>
<dependency>
@@ -45,31 +40,12 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <scope>provided</scope>
- <optional />
- </dependency>
- <dependency>
- <groupId>org.osgi</groupId>
- <artifactId>osgi.cmpn</artifactId>
- <scope>provided</scope>
- <optional />
- </dependency>
- <dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-core</artifactId>
- <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http</artifactId>
- <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -85,29 +61,22 @@
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore-nio</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpasyncclient</artifactId>
+ <groupId>org.apache.httpcomponents.client5</groupId>
+ <artifactId>httpclient5</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-frontend-jaxws</artifactId>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-testutils</artifactId>
- <version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AnyAuthScope.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AnyAuthScope.java
new file mode 100644
index 0000000..c79815f
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AnyAuthScope.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import org.apache.hc.client5.http.auth.AuthScope;
+
+class AnyAuthScope extends AuthScope {
+ AnyAuthScope() {
+ super(null, null, 1, null, null);
+ }
+
+ @Override
+ public int match(AuthScope that) {
+ return 1;
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
new file mode 100644
index 0000000..d6195c6
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java
@@ -0,0 +1,979 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.common.util.PropertyUtils;
+import org.apache.cxf.common.util.StringUtils;
+import org.apache.cxf.configuration.jsse.SSLUtils;
+import org.apache.cxf.configuration.jsse.TLSClientParameters;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.io.CopyingOutputStream;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.Address;
+import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory.UseAsyncPolicy;
+import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.hc.client5.http.async.HttpAsyncClient;
+import org.apache.hc.client5.http.auth.AuthSchemeFactory;
+import org.apache.hc.client5.http.auth.AuthScope;
+import org.apache.hc.client5.http.auth.Credentials;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
+import org.apache.hc.client5.http.protocol.HttpClientContext;
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.config.Registry;
+import org.apache.hc.core5.http.config.RegistryBuilder;
+import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
+import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
+import org.apache.hc.core5.reactor.ssl.TlsDetails;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ * Async HTTP Conduit using Apache HttpClient 5
+ */
+public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
+ public static final String USE_ASYNC = "use.async.http.conduit";
+
+ private final AsyncHTTPConduitFactory factory;
+ private volatile int lastTlsHash = -1;
+ private volatile Object sslState;
+ private volatile URI sslURL;
+ private volatile SSLContext sslContext;
+ private volatile SSLSession session;
+ private volatile CloseableHttpAsyncClient client;
+
+ public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHTTPConduitFactory factory)
+ throws IOException {
+ super(b, ei, t);
+ this.factory = factory;
+ }
+
+ public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException {
+ if (client == null) {
+ client = factory.createClient(this);
+ }
+ if (client == null) {
+ throw new IOException("HttpAsyncClient is null");
+ }
+ return client;
+ }
+
+ public AsyncHTTPConduitFactory getAsyncHTTPConduitFactory() {
+ return factory;
+ }
+
+ @Override
+ protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy) throws IOException {
+ if (factory.isShutdown()) {
+ message.put(USE_ASYNC, Boolean.FALSE);
+ super.setupConnection(message, address, csPolicy);
+ return;
+ }
+ propagateJaxwsSpecTimeoutSettings(message, csPolicy);
+ boolean addressChanged = false;
+ // need to do some clean up work on the URI address
+ URI uri = address.getURI();
+ String uriString = uri.toString();
+ if (uriString.startsWith("hc://")) {
+ uriString = uriString.substring(5);
+ addressChanged = true;
+ } else if (uriString.startsWith("hc5://")) {
+ uriString = uriString.substring(6);
+ addressChanged = true;
+ }
+
+ if (addressChanged) {
+ try {
+ uri = new URI(uriString);
+ } catch (URISyntaxException ex) {
+ throw new MalformedURLException("unsupport uri: " + uriString);
+ }
+ }
+
+ String s = uri.getScheme();
+ if (!"http".equals(s) && !"https".equals(s)) {
+ throw new MalformedURLException("unknown protocol: " + s);
+ }
+
+ Object o = message.getContextualProperty(USE_ASYNC);
+ if (o == null) {
+ o = factory.getUseAsyncPolicy();
+ }
+
+ switch (UseAsyncPolicy.getPolicy(o)) {
+ case ALWAYS:
+ o = true;
+ break;
+ case NEVER:
+ o = false;
+ break;
+ case ASYNC_ONLY:
+ default:
+ o = !message.getExchange().isSynchronous();
+ break;
+ }
+
+ // check tlsClientParameters from message header
+ TLSClientParameters clientParameters = message.get(TLSClientParameters.class);
+ if (clientParameters == null) {
+ clientParameters = tlsClientParameters;
+ }
+
+ if ("https".equals(uri.getScheme())
+ && clientParameters != null
+ && clientParameters.getSSLSocketFactory() != null) {
+ //if they configured in an SSLSocketFactory, we cannot do anything
+ //with it as the NIO based transport cannot use socket created from
+ //the SSLSocketFactory.
+ o = false;
+ }
+
+ if (!PropertyUtils.isTrue(o)) {
+ message.put(USE_ASYNC, Boolean.FALSE);
+ super.setupConnection(message, addressChanged ? new Address(uriString, uri) : address, csPolicy);
+ return;
+ }
+
+ if (StringUtils.isEmpty(uri.getPath())) {
+ //hc needs to have the path be "/"
+ uri = uri.resolve("/");
+ }
+
+ message.put(USE_ASYNC, Boolean.TRUE);
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Asynchronous connection to " + uri.toString() + " has been set up");
+ }
+ message.put("http.scheme", uri.getScheme());
+ String httpRequestMethod =
+ (String)message.get(Message.HTTP_REQUEST_METHOD);
+ if (httpRequestMethod == null) {
+ httpRequestMethod = "POST";
+ message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
+ }
+ final CXFHttpRequest e = new CXFHttpRequest(httpRequestMethod, uri);
+ final String contentType = (String)message.get(Message.CONTENT_TYPE);
+ final MutableHttpEntity entity = new MutableHttpEntity(contentType, null, true) {
+ public boolean isRepeatable() {
+ return e.getOutputStream().retransmitable();
+ }
+ };
+
+ e.setEntity(entity);
+
+ final RequestConfig.Builder b = RequestConfig
+ .custom()
+ .setConnectTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionTimeout()))
+ .setResponseTimeout(Timeout.ofMilliseconds(csPolicy.getReceiveTimeout()))
+ .setConnectionRequestTimeout(Timeout.ofMilliseconds(csPolicy.getConnectionRequestTimeout()));
+
+ final Proxy p = proxyFactory.createProxy(csPolicy, uri);
+ if (p != null && p.type() != Proxy.Type.DIRECT) {
+ InetSocketAddress isa = (InetSocketAddress)p.address();
+ HttpHost proxy = new HttpHost(isa.getHostString(), isa.getPort());
+ b.setProxy(proxy);
+ }
+ e.setConfig(b.build());
+
+ message.put(CXFHttpRequest.class, e);
+ }
+
+ private void propagateJaxwsSpecTimeoutSettings(Message message, HTTPClientPolicy csPolicy) {
+ int receiveTimeout = determineReceiveTimeout(message, csPolicy);
+ if (csPolicy.getReceiveTimeout() == 60000) {
+ csPolicy.setReceiveTimeout(receiveTimeout);
+ }
+ int connectionTimeout = determineConnectionTimeout(message, csPolicy);
+ if (csPolicy.getConnectionTimeout() == 30000) {
+ csPolicy.setConnectionTimeout(connectionTimeout);
+ }
+ }
+
+ @Override
+ protected OutputStream createOutputStream(Message message, boolean needToCacheRequest,
+ boolean isChunking, int chunkThreshold) throws IOException {
+ if (Boolean.TRUE.equals(message.get(USE_ASYNC))) {
+ final CXFHttpRequest entity = message.get(CXFHttpRequest.class);
+ final AsyncWrappedOutputStream out = new AsyncWrappedOutputStream(message, needToCacheRequest,
+ isChunking, chunkThreshold, getConduitName(), entity.getUri());
+ entity.setOutputStream(out);
+ return out;
+ } else {
+ return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold);
+ }
+ }
+
+ public class AsyncWrappedOutputStream extends WrappedOutputStream
+ implements CopyingOutputStream, WritableByteChannel {
+ private final HTTPClientPolicy csPolicy;
+
+ private CXFHttpRequest entity;
+ private MutableHttpEntity basicEntity;
+
+ private boolean isAsync;
+ private SharedInputBuffer inbuf;
+ private SharedOutputBuffer outbuf;
+
+ // Objects for the response
+ private volatile HttpResponse httpResponse;
+ private volatile Exception exception;
+
+ private Future<Boolean> connectionFuture;
+
+ private Object sessionLock = new Object();
+ private boolean closed;
+
+ public AsyncWrappedOutputStream(Message message, boolean needToCacheRequest, boolean isChunking,
+ int chunkThreshold, String conduitName, URI uri) {
+ super(message, needToCacheRequest, isChunking, chunkThreshold, conduitName, uri);
+
+ csPolicy = getClient(message);
+ entity = message.get(CXFHttpRequest.class);
+ basicEntity = (MutableHttpEntity)entity.getEntity();
+ basicEntity.setChunked(isChunking);
+
+ final int bufSize = csPolicy.getChunkLength() > 0 ? csPolicy.getChunkLength() : 16320;
+ inbuf = new SharedInputBuffer(bufSize);
+ outbuf = new SharedOutputBuffer(bufSize);
+ isAsync = outMessage != null && outMessage.getExchange() != null
+ && !outMessage.getExchange().isSynchronous();
+ }
+
+ public boolean retransmitable() {
+ return cachedStream != null;
+ }
+
+ public CachedOutputStream getCachedStream() {
+ return cachedStream;
+ }
+
+ protected void setProtocolHeaders() throws IOException {
+ final Headers h = new Headers(outMessage);
+ basicEntity.setContentType(h.determineContentType());
+
+ final boolean addHeaders = MessageUtils.getContextualBoolean(outMessage,
+ Headers.ADD_HEADERS_PROPERTY, false);
+ for (Map.Entry<String, List<String>> header : h.headerMap().entrySet()) {
+ if (HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header.getKey())) {
+ continue;
+ }
+ if (addHeaders || HttpHeaderHelper.COOKIE.equalsIgnoreCase(header.getKey())) {
+ for (String s : header.getValue()) {
+ entity.addHeader(HttpHeaderHelper.COOKIE, s);
+ }
+ } else if (!"Content-Length".equalsIgnoreCase(header.getKey())) {
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < header.getValue().size(); i++) {
+ b.append(header.getValue().get(i));
+ if (i + 1 < header.getValue().size()) {
+ b.append(',');
+ }
+ }
+ entity.setHeader(header.getKey(), b.toString());
+ }
+ if (!entity.containsHeader("User-Agent")) {
+ entity.setHeader("User-Agent", Version.getCompleteVersionString());
+ }
+ }
+ }
+
+ protected void setFixedLengthStreamingMode(int i) {
+ basicEntity.setChunked(false);
+ basicEntity.setContentLength(i);
+ }
+
+ public void thresholdReached() throws IOException {
+ basicEntity.setChunked(chunking);
+ }
+
+ protected void handleNoOutput() throws IOException {
+ connect(false);
+ outbuf.writeCompleted();
+ }
+
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ public int write(ByteBuffer src) throws IOException {
+ int total = 0;
+ if (buffer != null) {
+ int pos = buffer.size();
+ int len = this.threshold - pos;
+ if (len > src.remaining()) {
+ len = src.remaining();
+ }
+ src.get(buffer.getRawBytes(), pos, len);
+ buffer.setSize(buffer.size() + len);
+ total += len;
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ }
+ if (cachingForRetransmission) {
+ wrappedStream.write(src.array(), src.position(), src.remaining());
+ return src.remaining() + total;
+ }
+ return outbuf.write(src) + total;
+ }
+
+ public int copyFrom(InputStream in) throws IOException {
+ int count = 0;
+ while (buffer != null) {
+ int pos = buffer.size();
+ int i = in.read(buffer.getRawBytes(), pos,
+ this.threshold - pos);
+ if (i > 0) {
+ buffer.setSize(pos + i);
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ count += i;
+ } else {
+ return count;
+ }
+ }
+
+ if (cachingForRetransmission) {
+ count += IOUtils.copy(in, wrappedStream);
+ } else {
+ count += outbuf.copy(in);
+ }
+ return count;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (!chunking && wrappedStream instanceof CachedOutputStream) {
+ CachedOutputStream out = (CachedOutputStream)wrappedStream;
+ this.basicEntity.setContentLength(out.size());
+ wrappedStream = null;
+ handleHeadersTrustCaching();
+ out.writeCacheTo(wrappedStream);
+ }
+ super.close();
+ }
+
+ @Override
+ protected void onFirstWrite() throws IOException {
+ if (chunking) {
+ super.onFirstWrite();
+ } else {
+ wrappedStream = new CachedOutputStream();
+ }
+ }
+
+ protected void setupWrappedStream() throws IOException {
+ connect(true);
+ wrappedStream = new OutputStream() {
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ outbuf.write(b, off, len);
+ }
+ public void write(int b) throws IOException {
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
+ }
+ outbuf.write(b);
+ }
+ public void close() throws IOException {
+ outbuf.writeCompleted();
+ }
+ };
+
+ // If we need to cache for retransmission, store data in a
+ // CacheAndWriteOutputStream. Otherwise write directly to the output stream.
+ if (cachingForRetransmission) {
+ cachedStream = new CacheAndWriteOutputStream(wrappedStream);
+ wrappedStream = cachedStream;
+ }
+ }
+
+ protected void connect(boolean output) throws IOException {
+ if (connectionFuture != null) {
+ return;
+ }
+
+ CXFResponseCallback responseCallback = new CXFResponseCallback() {
+ @Override
+ public void responseReceived(HttpResponse response) {
+ setHttpResponse(response);
+ }
+
+ };
+
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
+
+ public void completed(Boolean result) {
+ }
+
+ public void failed(Exception ex) {
+ setException(ex);
+ inbuf.shutdown();
+ outbuf.shutdown();
+ }
+ public void cancelled() {
+ handleCancelled();
+ inbuf.shutdown();
+ outbuf.shutdown();
+ }
+
+ };
+
+ if (!output) {
+ entity.removeHeaders("Transfer-Encoding");
+ entity.removeHeaders("Content-Type");
+ entity.setEntity(null);
+ }
+
+ HttpClientContext ctx = HttpClientContext.create();
+
+ BasicCredentialsProvider credsProvider = new BasicCredentialsProvider() {
+ @Override
+ public Credentials getCredentials(final AuthScope authscope, HttpContext context) {
+ Credentials creds = super.getCredentials(authscope, context);
+
+ if (creds != null) {
+ return creds;
+ }
+ if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
+ && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() != null) {
+ return new UsernamePasswordCredentials(
+ AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName(),
+ AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword().toCharArray());
+ }
+ return null;
+ }
+
+ };
+
+ ctx.setCredentialsProvider(credsProvider);
+
+ if ("https".equals(url.getScheme())) {
+ try {
+ RegistryBuilder<TlsStrategy> regBuilder = RegistryBuilder.<TlsStrategy>create();
+
+ // check tlsClientParameters from message header
+ TLSClientParameters tlsClientParameters = outMessage.get(TLSClientParameters.class);
+ if (tlsClientParameters == null) {
+ tlsClientParameters = getTlsClientParameters();
+ }
+ if (tlsClientParameters == null) {
+ tlsClientParameters = new TLSClientParameters();
+ }
+ final SSLContext sslcontext = getSSLContext(tlsClientParameters);
+ final HostnameVerifier verifier = org.apache.cxf.transport.https.SSLUtils
+ .getHostnameVerifier(tlsClientParameters);
+ regBuilder
+ .register("https",
+ new BasicClientTlsStrategy(
+ sslcontext,
+ new SSLSessionInitializer() {
+ @Override
+ public void initialize(NamedEndpoint endpoint, SSLEngine engine) {
+ initializeSSLEngine(sslcontext, engine);
+ }
+ },
+ new SSLSessionVerifier() {
+ @Override
+ public TlsDetails verify(NamedEndpoint endpoint, SSLEngine engine)
+ throws SSLException {
+ final SSLSession sslsession = engine.getSession();
+
+ if (!verifier.verify(endpoint.getHostName(), sslsession)) {
+ throw new SSLException("Could not verify host " + endpoint.getHostName());
+ }
+
+ setSSLSession(sslsession);
+ return new TlsDetails(sslsession, engine.getApplicationProtocol());
+ }
+ }
+ )
+ );
+ } catch (final GeneralSecurityException e) {
+ LOG.warning(e.getMessage());
+ }
+ }
+
+ if (sslURL != null && isSslTargetDifferent(sslURL, url)) {
+ sslURL = null;
+ sslState = null;
+ session = null;
+ }
+
+ if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash) {
+ ctx.setUserToken(sslState);
+ }
+
+ connectionFuture = new BasicFuture<>(callback);
+ final HttpAsyncClient c = getHttpAsyncClient();
+ final Credentials creds = (Credentials)outMessage.getContextualProperty(Credentials.class.getName());
+ if (creds != null) {
+ credsProvider.setCredentials(new AnyAuthScope(), creds);
+ ctx.setUserToken(creds.getUserPrincipal());
+ }
+ @SuppressWarnings("unchecked")
+ Registry<AuthSchemeFactory> asp = (Registry<AuthSchemeFactory>)outMessage
+ .getContextualProperty(AuthSchemeFactory.class.getName());
+ if (asp != null) {
+ ctx.setAuthSchemeRegistry(asp);
+ }
+
+ c.execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
+ new CXFHttpAsyncResponseConsumer(this, inbuf, responseCallback),
+ null, /* the push handler factory, optional and may be null */
+ ctx,
+ callback);
+ }
+
+ private boolean isSslTargetDifferent(URI lastURL, URI url) {
+ return !lastURL.getScheme().equals(url.getScheme())
+ || !lastURL.getHost().equals(url.getHost())
+ || lastURL.getPort() != url.getPort();
+ }
+
+ protected boolean retrySetHttpResponse(HttpResponse r) {
+ if (isAsync) {
+ setHttpResponse(r);
+ }
+
+ return !isAsync;
+ }
+
+ protected synchronized void setHttpResponse(HttpResponse r) {
+ httpResponse = r;
+ if (isAsync) {
+ //got a response, need to start the response processing now
+ try {
+ handleResponseOnWorkqueue(false, true);
+ isAsync = false; // don't trigger another start on next block. :-)
+ } catch (Exception ex) {
+ //ignore, we'll try again on the next consume;
+ }
+ }
+ notifyAll();
+ }
+
+ protected synchronized void setException(Exception ex) {
+ exception = ex;
+ if (isAsync) {
+ //got a response, need to start the response processing now
+ try {
+ handleResponseOnWorkqueue(false, true);
+ isAsync = false; // don't trigger another start on next block. :-)
+ } catch (Exception ex2) {
+ ex2.printStackTrace();
+ }
+ }
+ notifyAll();
+ }
+
+ protected synchronized void handleCancelled() {
+ notifyAll();
+ }
+
+ protected synchronized HttpResponse getHttpResponse() throws IOException {
+ while (httpResponse == null) {
+ if (exception == null) { //already have an exception, skip waiting
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ if (httpResponse == null) {
+ outbuf.shutdown();
+ inbuf.shutdown();
+
+ if (exception != null) {
+ if (exception instanceof IOException) {
+ throw (IOException)exception;
+ } else if (exception instanceof RuntimeException) {
+ throw (RuntimeException)exception;
+ }
+
+ throw new IOException(exception);
+ }
+
+ throw new SocketTimeoutException("Read Timeout");
+ }
+ }
+ return httpResponse;
+ }
+
+ protected void handleResponseAsync() throws IOException {
+ isAsync = true;
+ }
+
+ protected void closeInputStream() throws IOException {
+ byte[] bytes = new byte[1024];
+ while (inbuf.read(bytes) > 0) {
+ //nothing
+ }
+ inbuf.close();
+ inbuf.shutdown();
+ }
+
+ protected synchronized InputStream getInputStream() throws IOException {
+ return new InputStream() {
+ public int read() throws IOException {
+ return inbuf.read();
+ }
+ public int read(byte[] b) throws IOException {
+ return inbuf.read(b);
+ }
+ public int read(byte[] b, int off, int len) throws IOException {
+ return inbuf.read(b, off, len);
+ }
+ public void close() throws IOException {
+ inbuf.close();
+ }
+ };
+ }
+
+ protected boolean usingProxy() {
+ return this.entity.getConfig().getProxy() != null;
+ }
+
+ protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
+ if ("http".equals(outMessage.get("http.scheme"))) {
+ return null;
+ }
+ connect(true);
+ synchronized (sessionLock) {
+ if (session == null) {
+ try {
+ sessionLock.wait(csPolicy.getConnectionTimeout());
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ if (session == null) {
+ throw new IOException("No SSLSession detected");
+ }
+ }
+ HostnameVerifier verifier = org.apache.cxf.transport.https.SSLUtils
+ .getHostnameVerifier(tlsClientParameters);
+ if (!verifier.verify(url.getHost(), session)) {
+ throw new IOException("Could not verify host " + url.getHost());
+ }
+
+ String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
+ String cipherSuite = null;
+ Certificate[] localCerts = null;
+ Principal principal = null;
+ Certificate[] serverCerts = null;
+ Principal peer = null;
+ if (session != null) {
+ cipherSuite = session.getCipherSuite();
+ localCerts = session.getLocalCertificates();
+ principal = session.getLocalPrincipal();
+ serverCerts = session.getPeerCertificates();
+ peer = session.getPeerPrincipal();
+ }
+
+ return new HttpsURLConnectionInfo(url, method, cipherSuite, localCerts, principal, serverCerts, peer);
+ }
+
+ protected int getResponseCode() throws IOException {
+ return getHttpResponse().getCode();
+ }
+
+ protected String getResponseMessage() throws IOException {
+ return getHttpResponse().getReasonPhrase();
+ }
+
+ private String readHeaders(Headers h) throws IOException {
+ Header[] headers = getHttpResponse().getHeaders();
+ h.headerMap().clear();
+ String ct = null;
+ for (Header header : headers) {
+ List<String> s = h.headerMap().get(header.getName());
+ if (s == null) {
+ s = new ArrayList<>(1);
+ h.headerMap().put(header.getName(), s);
+ }
+ s.add(header.getValue());
+ if ("Content-Type".equalsIgnoreCase(header.getName())) {
+ ct = header.getValue();
+ }
+ }
+ return ct;
+ }
+
+ protected void updateResponseHeaders(Message inMessage) throws IOException {
+ Headers h = new Headers(inMessage);
+ inMessage.put(Message.CONTENT_TYPE, readHeaders(h));
+ cookies.readFromHeaders(h);
+ }
+
+ protected InputStream getPartialResponse() throws IOException {
+ InputStream in = null;
+ int responseCode = getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_ACCEPTED
+ || responseCode == HttpURLConnection.HTTP_OK) {
+
+ Header head = httpResponse.getFirstHeader(HttpHeaderHelper.CONTENT_LENGTH);
+ int cli = 0;
+ if (head != null) {
+ cli = Integer.parseInt(head.getValue());
+ }
+ head = httpResponse.getFirstHeader(HttpHeaderHelper.TRANSFER_ENCODING);
+ boolean isChunked = head != null && HttpHeaderHelper.CHUNKED.equalsIgnoreCase(head.getValue());
+ head = httpResponse.getFirstHeader(HttpHeaderHelper.CONNECTION);
+ boolean isEofTerminated = head != null && HttpHeaderHelper.CLOSE.equalsIgnoreCase(head.getValue());
+ if (cli > 0) {
+ in = getInputStream();
+ } else if (isChunked || isEofTerminated) {
+ // ensure chunked or EOF-terminated response is non-empty
+ try {
+ PushbackInputStream pin =
+ new PushbackInputStream(getInputStream());
+ int c = pin.read();
+ if (c != -1) {
+ pin.unread((byte)c);
+ in = pin;
+ }
+ } catch (IOException ioe) {
+ // ignore
+ }
+ }
+ }
+ return in;
+ }
+
+ protected void updateCookiesBeforeRetransmit() throws IOException {
+ Headers h = new Headers();
+ readHeaders(h);
+ cookies.readFromHeaders(h);
+ }
+
+ protected boolean authorizationRetransmit() throws IOException {
+ boolean b = super.authorizationRetransmit();
+ if (!b) {
+ //HTTPClient may be handling the authorization things instead of us, we
+ //just need to make sure we set the cookies and proceed and HC
+ //will do the negotiation and such.
+ try {
+ closeInputStream();
+ } catch (Throwable t) {
+ //ignore
+ }
+ cookies.writeToMessageHeaders(outMessage);
+ retransmit(url.toString());
+ return true;
+ }
+ return b;
+ }
+
+ protected void retransmitStream() throws IOException {
+ cachingForRetransmission = false; //already cached
+ setupWrappedStream();
+ cachedStream.writeCacheTo(wrappedStream);
+ wrappedStream.flush();
+ wrappedStream.close();
+ }
+
+ protected void setupNewConnection(String newURL) throws IOException {
+ httpResponse = null;
+ isAsync = outMessage != null && outMessage.getExchange() != null
+ && !outMessage.getExchange().isSynchronous();
+ exception = null;
+ connectionFuture = null;
+ session = null;
+ sslState = null;
+ sslURL = null;
+
+ //reset the buffers
+ int bufSize = csPolicy.getChunkLength() > 0 ? csPolicy.getChunkLength() : 16320;
+ inbuf = new SharedInputBuffer(bufSize);
+ outbuf = new SharedOutputBuffer(bufSize);
+ try {
+ if (defaultAddress.getString().equals(newURL)) {
+ setupConnection(outMessage, defaultAddress, csPolicy);
+ } else {
+ Address address = new Address(newURL);
+ this.url = address.getURI();
+ setupConnection(outMessage, address, csPolicy);
+ }
+ entity = outMessage.get(CXFHttpRequest.class);
+ basicEntity = (MutableHttpEntity)entity.getEntity();
+ entity.setOutputStream(this);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ public void setSSLSession(SSLSession sslsession) {
+ session = sslsession;
+ synchronized (sessionLock) {
+ sslState = sslsession.getLocalPrincipal();
+ sslURL = url;
+ sessionLock.notifyAll();
+ }
+ }
+
+ }
+
+ public synchronized SSLContext getSSLContext(TLSClientParameters tlsClientParameters)
+ throws GeneralSecurityException {
+
+ int hash = tlsClientParameters.hashCode();
+ if (hash == lastTlsHash && sslContext != null) {
+ return sslContext;
+ }
+
+ final SSLContext ctx;
+ if (tlsClientParameters.getSslContext() != null) {
+ ctx = tlsClientParameters.getSslContext();
+ } else {
+ String provider = tlsClientParameters.getJsseProvider();
+
+ String protocol = tlsClientParameters.getSecureSocketProtocol() != null ? tlsClientParameters
+ .getSecureSocketProtocol() : "TLS";
+
+ ctx = provider == null ? SSLContext.getInstance(protocol) : SSLContext
+ .getInstance(protocol, provider);
+
+ KeyManager[] keyManagers = tlsClientParameters.getKeyManagers();
+ if (keyManagers == null) {
+ keyManagers = org.apache.cxf.configuration.jsse.SSLUtils.getDefaultKeyStoreManagers(LOG);
+ }
+ KeyManager[] configuredKeyManagers =
+ org.apache.cxf.transport.https.SSLUtils.configureKeyManagersWithCertAlias(
+ tlsClientParameters, keyManagers);
+
+ TrustManager[] trustManagers = tlsClientParameters.getTrustManagers();
+ if (trustManagers == null) {
+ trustManagers = org.apache.cxf.configuration.jsse.SSLUtils.getDefaultTrustStoreManagers(LOG);
+ }
+
+ ctx.init(configuredKeyManagers, trustManagers, tlsClientParameters.getSecureRandom());
+
+ if (ctx.getClientSessionContext() != null) {
+ ctx.getClientSessionContext().setSessionTimeout(tlsClientParameters.getSslCacheTimeout());
+ }
+ }
+
+ sslContext = ctx;
+ lastTlsHash = hash;
+ sslState = null;
+ sslURL = null;
+ session = null;
+ return ctx;
+ }
+
+ public void initializeSSLEngine(SSLContext sslcontext, SSLEngine sslengine) {
+ TLSClientParameters tlsClientParameters = getTlsClientParameters();
+ if (tlsClientParameters == null) {
+ tlsClientParameters = new TLSClientParameters();
+ }
+
+ String[] cipherSuites =
+ SSLUtils.getCiphersuitesToInclude(tlsClientParameters.getCipherSuites(),
+ tlsClientParameters.getCipherSuitesFilter(),
+ sslcontext.getSocketFactory().getDefaultCipherSuites(),
+ SSLUtils.getSupportedCipherSuites(sslcontext),
+ LOG);
+ sslengine.setEnabledCipherSuites(cipherSuites);
+
+ String protocol = tlsClientParameters.getSecureSocketProtocol() != null ? tlsClientParameters
+ .getSecureSocketProtocol() : sslcontext.getProtocol();
+
+ String[] p = findProtocols(protocol, sslengine.getSupportedProtocols());
+ if (p != null) {
+ sslengine.setEnabledProtocols(p);
+ }
+ }
+
+ private String[] findProtocols(String p, String[] options) {
+ List<String> list = new ArrayList<>();
+ for (String s : options) {
+ if (s.equals(p)) {
+ return new String[] {p};
+ } else if (s.startsWith(p)) {
+ list.add(s);
+ }
+ }
+ if (list.isEmpty()) {
+ return null;
+ }
+ return list.toArray(new String[0]);
+ }
+
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitFactory.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitFactory.java
new file mode 100644
index 0000000..1594d1a
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitFactory.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitFactory;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.hc.client5.http.SystemDefaultDnsResolver;
+import org.apache.hc.client5.http.cookie.BasicCookieStore;
+import org.apache.hc.client5.http.cookie.Cookie;
+import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.protocol.RedirectStrategy;
+import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.ProtocolException;
+import org.apache.hc.core5.http.config.Registry;
+import org.apache.hc.core5.http.config.RegistryBuilder;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
+import org.apache.hc.core5.pool.PoolReusePolicy;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.reactor.IOReactorStatus;
+import org.apache.hc.core5.util.TimeValue;
+import org.apache.hc.core5.util.Timeout;
+
+/**
+ *
+ */
+@NoJSR250Annotations
+public class AsyncHTTPConduitFactory implements HTTPConduitFactory {
+
+ //TCP related properties
+ public static final String TCP_NODELAY = "org.apache.cxf.transport.http.async.TCP_NODELAY";
+ public static final String SO_KEEPALIVE = "org.apache.cxf.transport.http.async.SO_KEEPALIVE";
+ public static final String SO_LINGER = "org.apache.cxf.transport.http.async.SO_LINGER";
+ public static final String SO_TIMEOUT = "org.apache.cxf.transport.http.async.SO_TIMEOUT";
+
+ //ConnectionPool
+ public static final String MAX_CONNECTIONS = "org.apache.cxf.transport.http.async.MAX_CONNECTIONS";
+ public static final String MAX_PER_HOST_CONNECTIONS
+ = "org.apache.cxf.transport.http.async.MAX_PER_HOST_CONNECTIONS";
+ public static final String CONNECTION_TTL = "org.apache.cxf.transport.http.async.CONNECTION_TTL";
+ public static final String CONNECTION_MAX_IDLE = "org.apache.cxf.transport.http.async.CONNECTION_MAX_IDLE";
+
+ //AsycClient specific props
+ public static final String THREAD_COUNT = "org.apache.cxf.transport.http.async.ioThreadCount";
+ public static final String SELECT_INTERVAL = "org.apache.cxf.transport.http.async.selectInterval";
+
+ //CXF specific
+ public static final String USE_POLICY = "org.apache.cxf.transport.http.async.usePolicy";
+
+
+ public enum UseAsyncPolicy {
+ ALWAYS, ASYNC_ONLY, NEVER;
+
+ public static UseAsyncPolicy getPolicy(Object st) {
+ if (st instanceof UseAsyncPolicy) {
+ return (UseAsyncPolicy)st;
+ } else if (st instanceof String) {
+ String s = ((String)st).toUpperCase();
+ if ("ALWAYS".equals(s)) {
+ return ALWAYS;
+ } else if ("NEVER".equals(s)) {
+ return NEVER;
+ } else if ("ASYNC_ONLY".equals(s)) {
+ return ASYNC_ONLY;
+ } else {
+ st = Boolean.parseBoolean(s);
+ }
+ }
+ if (st instanceof Boolean) {
+ return ((Boolean)st).booleanValue() ? ALWAYS : NEVER;
+ }
+ return ASYNC_ONLY;
+ }
+ };
+
+ private volatile PoolingAsyncClientConnectionManager connectionManager;
+ private volatile CloseableHttpAsyncClient client;
+
+ private boolean isShutdown;
+ private UseAsyncPolicy policy;
+ private int maxConnections = 5000;
+ private int maxPerRoute = 1000;
+ private int connectionTTL = 60000;
+ private int connectionMaxIdle = 60000;
+
+ private int ioThreadCount = IOReactorConfig.DEFAULT.getIoThreadCount();
+ private long selectInterval = IOReactorConfig.DEFAULT.getSelectInterval().toMilliseconds();
+ private int soLinger = IOReactorConfig.DEFAULT.getSoLinger().toMillisecondsIntBound();
+ private int soTimeout = IOReactorConfig.DEFAULT.getSoTimeout().toMillisecondsIntBound();
+ private boolean soKeepalive = IOReactorConfig.DEFAULT.isSoKeepalive();
+ private boolean tcpNoDelay = true;
+
+ AsyncHTTPConduitFactory() {
+ super();
+ }
+
+ public AsyncHTTPConduitFactory(Map<String, Object> conf) {
+ this();
+ setProperties(conf);
+ }
+
+ public AsyncHTTPConduitFactory(Bus b) {
+ this();
+ addListener(b);
+ setProperties(b.getProperties());
+ }
+
+ public UseAsyncPolicy getUseAsyncPolicy() {
+ return policy;
+ }
+
+ public void update(Map<String, Object> props) {
+ if (setProperties(props) && client != null) {
+ restartReactor();
+ }
+ }
+
+ private void restartReactor() {
+ CloseableHttpAsyncClient client2 = client;
+ resetVars();
+ shutdown(client2);
+ }
+ private synchronized void resetVars() {
+ client = null;
+ connectionManager = null;
+ }
+
+ private boolean setProperties(Map<String, Object> s) {
+ //properties that can be updated "live"
+ if (s == null) {
+ return false;
+ }
+ Object st = s.get(USE_POLICY);
+ if (st == null) {
+ st = SystemPropertyAction.getPropertyOrNull(USE_POLICY);
+ }
+ policy = UseAsyncPolicy.getPolicy(st);
+
+ maxConnections = getInt(s.get(MAX_CONNECTIONS), maxConnections);
+ connectionTTL = getInt(s.get(CONNECTION_TTL), connectionTTL);
+ connectionMaxIdle = getInt(s.get(CONNECTION_MAX_IDLE), connectionMaxIdle);
+ maxPerRoute = getInt(s.get(MAX_PER_HOST_CONNECTIONS), maxPerRoute);
+
+ if (connectionManager != null) {
+ connectionManager.setMaxTotal(maxConnections);
+ connectionManager.setDefaultMaxPerRoute(maxPerRoute);
+ }
+
+ //properties that need a restart of the reactor
+ boolean changed = false;
+
+ int i = ioThreadCount;
+ ioThreadCount = getInt(s.get(THREAD_COUNT), Runtime.getRuntime().availableProcessors());
+ changed |= i != ioThreadCount;
+
+ long l = selectInterval;
+ selectInterval = getInt(s.get(SELECT_INTERVAL), 1000);
+ changed |= l != selectInterval;
+
+ i = soLinger;
+ soLinger = getInt(s.get(SO_LINGER), -1);
+ changed |= i != soLinger;
+
+ i = soTimeout;
+ soTimeout = getInt(s.get(SO_TIMEOUT), 0);
+ changed |= i != soTimeout;
+
+ boolean b = tcpNoDelay;
+ tcpNoDelay = getBoolean(s.get(TCP_NODELAY), true);
+ changed |= b != tcpNoDelay;
+
+ b = soKeepalive;
+ soKeepalive = getBoolean(s.get(SO_KEEPALIVE), false);
+ changed |= b != soKeepalive;
+
+ return changed;
+ }
+
+ private int getInt(Object s, int defaultv) {
+ int i = defaultv;
+ if (s instanceof String) {
+ i = Integer.parseInt((String)s);
+ } else if (s instanceof Number) {
+ i = ((Number)s).intValue();
+ }
+ if (i == -1) {
+ i = defaultv;
+ }
+ return i;
+ }
+
+ private boolean getBoolean(Object s, boolean defaultv) {
+ if (s instanceof String) {
+ return Boolean.parseBoolean((String)s);
+ } else if (s instanceof Boolean) {
+ return ((Boolean)s).booleanValue();
+ }
+ return defaultv;
+ }
+
+ public boolean isShutdown() {
+ return isShutdown;
+ }
+
+ @Override
+ public HTTPConduit createConduit(HTTPTransportFactory f, Bus bus, EndpointInfo localInfo,
+ EndpointReferenceType target) throws IOException {
+ return createConduit(bus, localInfo, target);
+ }
+
+ public HTTPConduit createConduit(Bus bus, EndpointInfo localInfo,
+ EndpointReferenceType target) throws IOException {
+ if (isShutdown) {
+ return null;
+ }
+ return new AsyncHTTPConduit(bus, localInfo, target, this);
+ }
+
+ public void shutdown() {
+ if (client != null) {
+ shutdown(client);
+ connectionManager = null;
+ client = null;
+ }
+ isShutdown = true;
+ }
+
+ private static void shutdown(CloseableHttpAsyncClient client) {
+ try {
+ client.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+
+ private void addListener(Bus b) {
+ BusLifeCycleManager manager = b.getExtension(BusLifeCycleManager.class);
+ if (manager != null) {
+ manager.registerLifeCycleListener(new BusLifeCycleListener() {
+ public void initComplete() {
+ }
+ public void preShutdown() {
+ shutdown();
+ }
+ public void postShutdown() {
+ }
+ });
+ }
+ }
+
+ public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy) {
+ if (client != null) {
+ return;
+ }
+
+ final IOReactorConfig config = IOReactorConfig.custom()
+ .setIoThreadCount(ioThreadCount)
+ .setSelectInterval(TimeValue.ofMilliseconds(selectInterval))
+ .setSoLinger(TimeValue.ofMilliseconds(soLinger))
+ .setSoTimeout(Timeout.ofMilliseconds(soTimeout))
+ .setSoKeepAlive(soKeepalive)
+ .setTcpNoDelay(tcpNoDelay)
+ .build();
+
+ final Registry<TlsStrategy> tlsStrategy = RegistryBuilder.<TlsStrategy>create()
+ .register("https", DefaultClientTlsStrategy.getSystemDefault())
+ .build();
+
+ connectionManager = new PoolingAsyncClientConnectionManager(
+ tlsStrategy,
+ PoolConcurrencyPolicy.STRICT,
+ PoolReusePolicy.LIFO,
+ TimeValue.ofMilliseconds(connectionTTL),
+ DefaultSchemePortResolver.INSTANCE,
+ SystemDefaultDnsResolver.INSTANCE);
+
+ connectionManager.setDefaultMaxPerRoute(maxPerRoute);
+ connectionManager.setMaxTotal(maxConnections);
+
+ final RedirectStrategy redirectStrategy = new RedirectStrategy() {
+ public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context)
+ throws ProtocolException {
+ return false;
+ }
+ public URI getLocationURI(HttpRequest request, HttpResponse response, HttpContext context)
+ throws ProtocolException {
+ return null;
+ }
+ };
+
+ final HttpAsyncClientBuilder httpAsyncClientBuilder = HttpAsyncClients
+ .custom()
+ .setConnectionManager(connectionManager)
+ .setRedirectStrategy(redirectStrategy)
+ .setDefaultCookieStore(new BasicCookieStore() {
+ private static final long serialVersionUID = 1L;
+ public void addCookie(Cookie cookie) {
+ }
+ });
+
+ adaptClientBuilder(httpAsyncClientBuilder);
+
+ client = httpAsyncClientBuilder
+ .setIOReactorConfig(config)
+ .build();
+ // Start the client thread
+ client.start();
+ //Always start the idle checker thread to validate pending requests and
+ //use the ConnectionMaxIdle to close the idle connection
+ new CloseIdleConnectionThread(connectionManager, client).start();
+ }
+
+ //provide a hook to customize the builder
+ protected void adaptClientBuilder(HttpAsyncClientBuilder httpAsyncClientBuilder) {
+ }
+
+ public CloseableHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
+ if (client == null) {
+ setupNIOClient(c.getClient());
+ }
+ return client;
+ }
+
+ int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public class CloseIdleConnectionThread extends Thread {
+ private final PoolingAsyncClientConnectionManager connMgr;
+ private final CloseableHttpAsyncClient client;
+
+ public CloseIdleConnectionThread(PoolingAsyncClientConnectionManager connMgr, CloseableHttpAsyncClient client) {
+ super("CXFCloseIdleConnectionThread");
+ this.connMgr = connMgr;
+ this.client = client;
+ }
+
+ @Override
+ public void run() {
+ long nextIdleCheck = System.currentTimeMillis() + connectionMaxIdle;
+ try {
+ while (client.getStatus() == IOReactorStatus.ACTIVE) {
+ synchronized (this) {
+ sleep(selectInterval);
+
+ if (connectionTTL == 0
+ && connectionMaxIdle > 0 && System.currentTimeMillis() >= nextIdleCheck) {
+ nextIdleCheck += connectionMaxIdle;
+ // close connections
+ // that have been idle longer than specified connectionMaxIdle
+ connMgr.closeIdle(TimeValue.ofMilliseconds(connectionMaxIdle));
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ // terminate
+ }
+ }
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpTransportFactory.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpTransportFactory.java
new file mode 100644
index 0000000..f481695
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpTransportFactory.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.configuration.Configurer;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractTransportFactory;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitConfigurer;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+/**
+ * The transport factory is the same as for Apache HttpClient 4.x, sharing the same namespaces and
+ * URIs.
+ */
+public class AsyncHttpTransportFactory extends AbstractTransportFactory implements ConduitInitiator {
+
+ public static final List<String> DEFAULT_NAMESPACES = Collections.unmodifiableList(Arrays
+ .asList("http://cxf.apache.org/transports/http/http-client"));
+
+ /**
+ * This constant holds the prefixes served by this factory.
+ */
+ private static final Set<String> URI_PREFIXES = new HashSet<>();
+
+ static {
+ URI_PREFIXES.add("hc://");
+ URI_PREFIXES.add("hc5://");
+ }
+
+ private AsyncHTTPConduitFactory factory = new AsyncHTTPConduitFactory();
+
+ public AsyncHttpTransportFactory() {
+ super(DEFAULT_NAMESPACES);
+ }
+
+ public void setAsyncHTTPConduitFactory(AsyncHTTPConduitFactory f) {
+ factory = f;
+ }
+
+ /**
+ * This call is used by CXF ExtensionManager to inject the activationNamespaces
+ * @param ans The transport ids.
+ */
+ public void setActivationNamespaces(Collection<String> ans) {
+ setTransportIds(new ArrayList<>(ans));
+ }
+
+ public Set<String> getUriPrefixes() {
+ return URI_PREFIXES;
+ }
+
+ protected void configure(Bus b, Object bean) {
+ configure(b, bean, null, null);
+ }
+
+ protected void configure(Bus bus, Object bean, String name, String extraName) {
+ Configurer configurer = bus.getExtension(Configurer.class);
+ if (null != configurer) {
+ configurer.configureBean(name, bean);
+ if (extraName != null) {
+ configurer.configureBean(extraName, bean);
+ }
+ }
+ }
+
+ protected String getAddress(EndpointInfo endpointInfo) {
+ String address = endpointInfo.getAddress();
+ if (address.startsWith("hc://")) {
+ address = address.substring(5);
+ } else if (address.startsWith("hc5://")) {
+ address = address.substring(6);
+ }
+ return address;
+ }
+
+ @Override
+ public Conduit getConduit(EndpointInfo endpointInfo, Bus bus) throws IOException {
+ return getConduit(endpointInfo, endpointInfo.getTarget(), bus);
+ }
+
+ @Override
+ public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target, Bus bus)
+ throws IOException {
+
+ // need to updated the endpointInfo
+ endpointInfo.setAddress(getAddress(endpointInfo));
+
+ AsyncHTTPConduitFactory fact = bus.getExtension(AsyncHTTPConduitFactory.class);
+ if (fact == null) {
+ fact = factory;
+ }
+ HTTPConduit conduit = fact.createConduit(bus, endpointInfo, target);
+
+ // Spring configure the conduit.
+ String address = conduit.getAddress();
+ if (address != null && address.indexOf('?') != -1) {
+ address = address.substring(0, address.indexOf('?'));
+ }
+ HTTPConduitConfigurer c1 = bus.getExtension(HTTPConduitConfigurer.class);
+ if (c1 != null) {
+ c1.configure(conduit.getBeanName(), address, conduit);
+ }
+ configure(bus, conduit, conduit.getBeanName(), address);
+ conduit.finalizeConfig();
+ return conduit;
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncRequestProducer.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncRequestProducer.java
new file mode 100644
index 0000000..a3e0862
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncRequestProducer.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.nio.AsyncRequestProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.RequestChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+public class CXFHttpAsyncRequestProducer implements AsyncRequestProducer {
+ private final CXFHttpRequest request;
+ private final SharedOutputBuffer buf;
+ private volatile CachedOutputStream content;
+ private volatile ByteBuffer buffer;
+ private volatile InputStream fis;
+ private volatile ReadableByteChannel chan;
+
+ public CXFHttpAsyncRequestProducer(final CXFHttpRequest request, final SharedOutputBuffer buf) {
+ super();
+ this.buf = buf;
+ this.request = request;
+ }
+
+ public HttpHost getTarget() {
+ URI uri = request.getUri();
+ if (uri == null) {
+ throw new IllegalStateException("Request URI is null");
+ }
+ if (!uri.isAbsolute()) {
+ throw new IllegalStateException("Request URI is not absolute");
+ }
+ return new HttpHost(uri.getScheme(), uri.getHost(), uri.getPort());
+ }
+
+ public HttpRequest generateRequest() throws IOException, HttpException {
+ return request;
+ }
+
+ @Override
+ public void produce(DataStreamChannel channel) throws IOException {
+ if (content != null) {
+ if (buffer == null) {
+ if (content.getTempFile() == null) {
+ buffer = ByteBuffer.wrap(content.getBytes());
+ } else {
+ fis = content.getInputStream();
+ chan = (fis instanceof FileInputStream)
+ ? ((FileInputStream)fis).getChannel() : Channels.newChannel(fis);
+ buffer = ByteBuffer.allocate(8 * 1024);
+ }
+ }
+ int i = -1;
+ ((Buffer)buffer).rewind();
+ if (buffer.hasRemaining() && chan != null) {
+ i = chan.read(buffer);
+ buffer.flip();
+ }
+ channel.write(buffer);
+ if (!buffer.hasRemaining() && i == -1) {
+ channel.endStream();
+ }
+ } else {
+ buf.produceContent(channel);
+ }
+ }
+
+ public void requestCompleted(final HttpContext context) {
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (IOException io) {
+ //ignore
+ }
+ chan = null;
+ fis = null;
+ }
+ buffer = null;
+ }
+
+ public void failed(final Exception ex) {
+ buf.shutdown();
+ }
+
+ public boolean isRepeatable() {
+ return request.getOutputStream().retransmitable();
+ }
+
+ public void resetRequest() throws IOException {
+ if (request.getOutputStream().retransmitable()) {
+ content = request.getOutputStream().getCachedStream();
+ }
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void releaseResources() {
+ buf.close();
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (IOException io) {
+ //ignore
+ }
+ chan = null;
+ fis = null;
+ }
+ buffer = null;
+ }
+
+ @Override
+ public void sendRequest(RequestChannel channel, HttpContext context) throws HttpException, IOException {
+ channel.sendRequest(request, request.getEntity(), context);
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
new file mode 100644
index 0000000..297bc8f
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpAsyncResponseConsumer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit.AsyncWrappedOutputStream;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.http.protocol.HttpContext;
+
+public class CXFHttpAsyncResponseConsumer implements AsyncResponseConsumer<Boolean> {
+ private final SharedInputBuffer buf;
+ private final AsyncWrappedOutputStream outstream;
+ private final CXFResponseCallback responseCallback;
+
+ private volatile boolean completed;
+ private volatile Exception exception;
+ private volatile HttpResponse response;
+
+ public CXFHttpAsyncResponseConsumer(
+ final AsyncWrappedOutputStream asyncWrappedOutputStream,
+ final SharedInputBuffer buf,
+ final CXFResponseCallback responseCallback) {
+ super();
+ this.outstream = asyncWrappedOutputStream;
+ this.responseCallback = responseCallback;
+ this.buf = buf;
+ }
+
+ @Override
+ public void releaseResources() {
+ buf.close();
+ }
+
+ @Override
+ public void updateCapacity(CapacityChannel capacityChannel) throws IOException {
+ capacityChannel.update(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void consumeResponse(HttpResponse resp, EntityDetails entityDetails, HttpContext context,
+ FutureCallback<Boolean> resultCallback) throws HttpException, IOException {
+ response = resp;
+ responseCallback.responseReceived(response);
+ resultCallback.completed(true);
+ }
+
+ @Override
+ public void consume(ByteBuffer src) throws IOException {
+ // Replicating HttpClient 4.x behavior. Try to gently feed more
+ // data to the event dispatcher if the session input buffer has
+ // not been fully exhausted (the choice of 5 iterations is purely arbitrary)
+ // or work queue is not ready to process the response.
+ for (int i = 0; i < 5; i++) {
+ // Only consume content when the work was accepted by the work queue
+ if (outstream.retrySetHttpResponse(response)) {
+ buf.consumeContent(src);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ completed = true;
+ exception = ex;
+ buf.shutdown();
+ }
+
+ @Override
+ public void streamEnd(List<? extends Header> trailers) throws HttpException, IOException {
+ completed = true;
+ buf.close();
+ }
+
+ @Override
+ public void informationResponse(HttpResponse resp, HttpContext context) throws HttpException, IOException {
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public boolean isCompleted() {
+ return completed;
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpRequest.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpRequest.java
new file mode 100644
index 0000000..037bb88
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFHttpRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit.AsyncWrappedOutputStream;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.config.Configurable;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.core5.http.HttpEntity;
+
+public class CXFHttpRequest extends HttpUriRequestBase implements Configurable {
+ private static final long serialVersionUID = 1L;
+
+ private HttpEntity entity;
+ private AsyncWrappedOutputStream out;
+ private RequestConfig config;
+
+ public CXFHttpRequest(String method, URI uri) {
+ super(method, uri);
+ }
+
+ public void setOutputStream(AsyncWrappedOutputStream o) {
+ out = o;
+ }
+ public AsyncWrappedOutputStream getOutputStream() {
+ return out;
+ }
+
+ public HttpEntity getEntity() {
+ return this.entity;
+ }
+
+ public void setEntity(final HttpEntity entity) {
+ this.entity = entity;
+ }
+
+ @Override
+ public RequestConfig getConfig() {
+ return config;
+ }
+
+ public void setConfig(RequestConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public URI getUri() {
+ try {
+ return super.getUri();
+ } catch (final URISyntaxException ex) {
+ throw new IllegalArgumentException(ex.getMessage(), ex);
+ }
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFResponseCallback.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFResponseCallback.java
new file mode 100644
index 0000000..9761123
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/CXFResponseCallback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import org.apache.hc.core5.http.HttpResponse;
+
+interface CXFResponseCallback {
+ void responseReceived(HttpResponse response);
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/MutableHttpEntity.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/MutableHttpEntity.java
new file mode 100644
index 0000000..288fa46
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/MutableHttpEntity.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hc.core5.function.Supplier;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpEntity;
+
+abstract class MutableHttpEntity implements HttpEntity {
+ static final int OUTPUT_BUFFER_SIZE = 4096;
+
+ private InputStream content;
+ private String contentType;
+ private String contentEncoding;
+ private boolean chunked;
+ private long length;
+
+ MutableHttpEntity(final String contentType, final String contentEncoding, final boolean chunked) {
+ this.contentType = contentType;
+ this.contentEncoding = contentEncoding;
+ this.chunked = chunked;
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return contentEncoding;
+ }
+
+ @Override
+ public InputStream getContent() throws IOException, UnsupportedOperationException {
+ return content;
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return false;
+ }
+
+ @Override
+ public long getContentLength() {
+ return length;
+ }
+
+ public void setContentLength(long l) {
+ this.length = l;
+ }
+
+ public void setChunked(boolean chunked) {
+ this.chunked = chunked;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return chunked;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Supplier<List<? extends Header>> getTrailers() {
+ return null;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public static void writeTo(final HttpEntity entity, final OutputStream outStream) throws IOException {
+ try (InputStream inStream = entity.getContent()) {
+ if (inStream != null) {
+ int count;
+ final byte[] tmp = new byte[OUTPUT_BUFFER_SIZE];
+ while ((count = inStream.read(tmp)) != -1) {
+ outStream.write(tmp, 0, count);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeTo(final OutputStream outStream) throws IOException {
+ writeTo(this, outStream);
+ }
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedInputBuffer.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedInputBuffer.java
new file mode 100644
index 0000000..994b6ce
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedInputBuffer.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+
+/**
+ * Content buffer that can be shared by multiple threads, usually the I/O dispatch of
+ * an I/O reactor and a worker thread.
+ * <p/>
+ * The I/O dispatch thread is expect to transfer data from {@link ByteBuffer} to the buffer
+ * by calling {@link #consumeContent(ByteBuffer)}.
+ * <p/>
+ * The worker thread is expected to read the data from the buffer by calling
+ * {@link #read()} or {@link #read(byte[], int, int)} methods.
+ * <p/>
+ * In case of an abnormal situation or when no longer needed the buffer must be shut down
+ * using {@link #shutdown()} method.
+ */
+public class SharedInputBuffer extends ExpandableBuffer {
+
+ private final ReentrantLock lock;
+ private final Condition condition;
+
+ private volatile boolean shutdown;
+ private volatile boolean endOfStream;
+
+ private volatile ByteBuffer waitingBuffer;
+
+ public SharedInputBuffer(int buffersize) {
+ super(buffersize);
+ this.lock = new ReentrantLock();
+ this.condition = this.lock.newCondition();
+ }
+
+ public void reset() {
+ if (this.shutdown) {
+ return;
+ }
+ this.lock.lock();
+ try {
+ clear();
+ this.endOfStream = false;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public int consumeContent(final ByteBuffer buffer) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ this.lock.lock();
+ try {
+ setInputMode();
+ int totalRead = 0;
+ int bytesRead;
+ if (waitingBuffer != null && buffer().position() == 0) {
+ while ((bytesRead = transfer(buffer, this.waitingBuffer)) > 0) {
+ totalRead += bytesRead;
+ }
+ }
+ //read more
+ while ((bytesRead = transfer(buffer, buffer())) > 0) {
+ totalRead += bytesRead;
+ }
+
+ if (bytesRead == -1) {
+ this.endOfStream = true;
+ }
+
+ this.condition.signalAll();
+
+ if (totalRead > 0) {
+ return totalRead;
+ }
+ if (this.endOfStream) {
+ return -1;
+ }
+ return 0;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasData() {
+ this.lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public int capacity() {
+ this.lock.lock();
+ try {
+ return super.capacity();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public int length() {
+ this.lock.lock();
+ try {
+ return super.length();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ protected void waitForData(int waitPos) throws IOException {
+ this.lock.lock();
+ try {
+ try {
+ while (true) {
+ if (this.waitingBuffer != null && this.waitingBuffer.position() > waitPos) {
+ return;
+ }
+ if (super.hasData()) {
+ return;
+ }
+ if (this.endOfStream) {
+ return;
+ }
+ if (this.shutdown) {
+ throw new InterruptedIOException("Input operation aborted");
+ }
+ this.condition.await();
+ }
+ } catch (InterruptedException ex) {
+ throw new IOException("Interrupted while waiting for more data");
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void close() {
+ if (this.shutdown) {
+ return;
+ }
+ this.endOfStream = true;
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void shutdown() {
+ if (this.shutdown) {
+ return;
+ }
+ this.shutdown = true;
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ protected boolean isShutdown() {
+ return this.shutdown;
+ }
+
+ protected boolean isEndOfStream() {
+ return this.shutdown || (!hasData() && this.endOfStream);
+ }
+
+ public int read() throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ this.lock.lock();
+ try {
+ if (!super.hasData()) {
+ waitForData(0);
+ }
+ if (isEndOfStream()) {
+ return -1;
+ }
+ setOutputMode();
+ return buffer().get() & 0xff;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public int read(final byte[] b, int off, int len) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ if (b == null) {
+ return 0;
+ }
+ this.lock.lock();
+ try {
+ if (!hasData()) {
+ this.waitingBuffer = ByteBuffer.wrap(b, off, len);
+ waitForData(off);
+ int i = waitingBuffer.position() - off;
+ waitingBuffer = null;
+ if (i > 0) {
+ //++waitCnt;
+ return i;
+ }
+ }
+ if (isEndOfStream()) {
+ return -1;
+ }
+ setOutputMode();
+ int chunk = len;
+ if (chunk > buffer().remaining()) {
+ chunk = buffer().remaining();
+ }
+ buffer().get(b, off, chunk);
+ return chunk;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public int read(final byte[] b) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ if (b == null) {
+ return 0;
+ }
+ return read(b, 0, b.length);
+ }
+
+ private int transfer(ByteBuffer from, ByteBuffer to) {
+ int transfer = Math.min(to.remaining(), from.remaining());
+ if (from.remaining() == 0) {
+ return -1;
+ }
+
+ // use a duplicated buffer so we don't disrupt the limit of the original buffer
+ final ByteBuffer tmp = from.duplicate();
+ tmp.limit(tmp.position() + transfer);
+ to.put(tmp);
+
+ // now discard the data we've copied from the original source (optional)
+ from.position(from.position() + transfer);
+ return transfer;
+ }
+
+}
diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedOutputBuffer.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedOutputBuffer.java
new file mode 100644
index 0000000..42ddc13
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/SharedOutputBuffer.java
@@ -0,0 +1,327 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hc.core5.http.impl.nio.ExpandableBuffer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+
+
+/**
+ * Content buffer that can be shared by multiple threads, usually the I/O dispatch of
+ * an I/O reactor and a worker thread.
+ * <p/>
+ * The I/O dispatch thread is expected to transfer data from the buffer to
+ * {@link DataStreamChannel} by calling {@link #produceContent(DataStreamChannel)}.
+ * <p/>
+ * The worker thread is expected to write data to the buffer by calling
+ * {@link #write(int)}, {@link #write(byte[], int, int)} or {@link #writeCompleted()}
+ * <p/>
+ * In case of an abnormal situation or when no longer needed the buffer must be
+ * shut down using {@link #shutdown()} method.
+ */
+public class SharedOutputBuffer extends ExpandableBuffer {
+
+ private final ReentrantLock lock;
+ private final Condition condition;
+
+ private volatile DataStreamChannel channel;
+ private volatile boolean shutdown;
+ private volatile boolean endOfStream;
+
+ private volatile ByteBuffer largeWrapper;
+
+ public SharedOutputBuffer(int buffersize) {
+ super(buffersize);
+ this.lock = new ReentrantLock();
+ this.condition = this.lock.newCondition();
+ }
+
+ public void reset() {
+ if (this.shutdown) {
+ return;
+ }
+ this.lock.lock();
+ try {
+ clear();
+ this.endOfStream = false;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean hasData() {
+ this.lock.lock();
+ try {
+ return super.hasData();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public int capacity() {
+ this.lock.lock();
+ try {
+ return super.capacity();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public int length() {
+ this.lock.lock();
+ try {
+ return super.length();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public int produceContent(final DataStreamChannel stream) throws IOException {
+ if (this.shutdown) {
+ return -1;
+ }
+ this.lock.lock();
+ try {
+ this.channel = stream;
+ setOutputMode();
+ int bytesWritten = 0;
+ if (largeWrapper != null || super.hasData()) {
+ if (!buffer().hasRemaining() && largeWrapper != null) {
+ bytesWritten = channel.write(largeWrapper);
+ } else {
+ bytesWritten = channel.write(buffer());
+ }
+ }
+ if ((largeWrapper == null || !largeWrapper.hasRemaining()) && !super.hasData()) {
+ // No more buffered content
+ // If at the end of the stream, terminate
+ this.endOfStream = true;
+ channel.endStream();
+ }
+ // no need to signal if the large wrapper is present and has data remaining
+ if (largeWrapper == null || !largeWrapper.hasRemaining()) {
+ this.condition.signalAll();
+ }
+ return bytesWritten;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void close() {
+ shutdown();
+ }
+
+ public void shutdown() {
+ if (this.shutdown) {
+ return;
+ }
+ this.shutdown = true;
+ this.lock.lock();
+ try {
+ this.condition.signalAll();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+ public int copy(InputStream in) throws IOException {
+ this.lock.lock();
+ int total = 0;
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+ int i = 0;
+ boolean yielded = false;
+ while (i != -1) {
+ if (!buffer().hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ i = in.available();
+ if (i == 0 && !yielded) {
+ //nothing avail right now, we'll attempt an
+ //output, but not really force a flush.
+ if (buffer().position() != 0 && this.channel != null) {
+ this.channel.requestOutput();
+ }
+ try {
+ condition.awaitNanos(1);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ setInputMode();
+ yielded = true;
+ } else {
+ int p = buffer().position();
+ i = in.read(buffer().array(), buffer().position(), buffer().remaining());
+ yielded = false;
+ if (i != -1) {
+ total += i;
+ buffer().position(p + i);
+ }
+
+ }
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ return total;
+ }
+
+ public void write(final byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ return;
+ }
+ this.lock.lock();
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+ int remaining = len;
+ while (remaining > 0) {
+ if (!buffer().hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ if (buffer().position() == 0 && (buffer().remaining() * 2) < remaining) {
+ largeWrapper = ByteBuffer.wrap(b, off, remaining);
+ while (largeWrapper.hasRemaining()) {
+ flushContent();
+ }
+ largeWrapper = null;
+ remaining = 0;
+ } else {
+ int chunk = Math.min(remaining, buffer().remaining());
+ buffer().put(b, off, chunk);
+ remaining -= chunk;
+ off += chunk;
+ }
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public int write(ByteBuffer b) throws IOException {
+ if (b == null) {
+ return 0;
+ }
+ this.lock.lock();
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+
+ if (!buffer().hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ int c = b.limit() - b.position();
+ largeWrapper = b;
+ while (largeWrapper.hasRemaining()) {
+ flushContent();
+ }
+ largeWrapper = null;
+ return c;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void write(final byte[] b) throws IOException {
+ if (b == null) {
+ return;
+ }
+ write(b, 0, b.length);
+ }
+
+ public void write(int b) throws IOException {
+ this.lock.lock();
+ try {
+ if (this.shutdown || this.endOfStream) {
+ throw new IllegalStateException("Buffer already closed for writing");
+ }
+ setInputMode();
+ if (!buffer().hasRemaining()) {
+ flushContent();
+ setInputMode();
+ }
+ buffer().put((byte)b);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void flush() throws IOException {
+ }
+
+ private void flushContent() throws IOException {
+ this.lock.lock();
+ try {
+ try {
+ while ((largeWrapper != null && largeWrapper.hasRemaining()) || super.hasData()) {
+ if (this.shutdown) {
+ throw new InterruptedIOException("Output operation aborted");
+ }
+ if (this.channel != null) {
+ this.channel.requestOutput();
+ }
+ this.condition.await();
+ }
+ } catch (InterruptedException ex) {
+ throw new IOException("Interrupted while flushing the content buffer");
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ public void writeCompleted() throws IOException {
+ this.lock.lock();
+ try {
+ if (this.endOfStream) {
+ return;
+ }
+ this.endOfStream = true;
+ if (this.channel != null) {
+ this.channel.requestOutput();
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+
+
+}
diff --git a/rt/transports/http-hc5/src/main/resources/META-INF/cxf/bus-extensions.txt b/rt/transports/http-hc5/src/main/resources/META-INF/cxf/bus-extensions.txt
new file mode 100644
index 0000000..42a2ae7
--- /dev/null
+++ b/rt/transports/http-hc5/src/main/resources/META-INF/cxf/bus-extensions.txt
@@ -0,0 +1,3 @@
+org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory:org.apache.cxf.transport.http.HTTPConduitFactory:true:true
+org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpTransportFactory:org.apache.cxf.transport.ConduitInitiator:true:true
+
diff --git a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
new file mode 100644
index 0000000..b072c56
--- /dev/null
+++ b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient.hc5;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.Response;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPConduitFactory;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueueImpl;
+import org.apache.cxf.workqueue.WorkQueueManager;
+import org.apache.hello_world_soap_http.Greeter;
+import org.apache.hello_world_soap_http.SOAPService;
+import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class AsyncHTTPConduitTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = allocatePort(AsyncHTTPConduitTest.class);
+ public static final String PORT_INV = allocatePort(AsyncHTTPConduitTest.class, 2);
+ public static final String FILL_BUFFER = "FillBuffer";
+
+ static Endpoint ep;
+ static String request;
+ static Greeter g;
+
+ @BeforeClass
+ public static void start() throws Exception {
+ Bus b = createStaticBus();
+ b.setProperty(AsyncHTTPConduit.USE_ASYNC, AsyncHTTPConduitFactory.UseAsyncPolicy.ALWAYS);
+ b.setProperty("org.apache.cxf.transport.http.async.MAX_CONNECTIONS", 501);
+
+ BusFactory.setThreadDefaultBus(b);
+
+ AsyncHTTPConduitFactory hcf = (AsyncHTTPConduitFactory)b.getExtension(HTTPConduitFactory.class);
+ assertEquals(501, hcf.getMaxConnections());
+
+ ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort",
+ new org.apache.hello_world_soap_http.GreeterImpl() {
+ public String greetMeLater(long cnt) {
+ //use the continuations so the async client can
+ //have a ton of connections, use less threads
+ //
+ //mimic a slow server by delaying somewhere between
+ //1 and 2 seconds, with a preference of delaying the earlier
+ //requests longer to create a sort of backlog/contention
+ //with the later requests
+ ContinuationProvider p = (ContinuationProvider)
+ getContext().getMessageContext().get(ContinuationProvider.class.getName());
+ Continuation c = p.getContinuation();
+ if (c.isNew()) {
+ if (cnt < 0) {
+ c.suspend(-cnt);
+ } else {
+ c.suspend(2000 - (cnt % 1000));
+ }
+ return null;
+ }
+ return "Hello, finally! " + cnt;
+ }
+ public String greetMe(String me) {
+ if (me.equals(FILL_BUFFER)) {
+ return String.join("", Collections.nCopies(16093, " "));
+ } else {
+ return "Hello " + me;
+ }
+ }
+ });
+
+ StringBuilder builder = new StringBuilder("NaNaNa");
+ for (int x = 0; x < 50; x++) {
+ builder.append(" NaNaNa ");
+ }
+ request = builder.toString();
+
+ URL wsdl = AsyncHTTPConduitTest.class.getResource("/wsdl/hello_world_services.wsdl");
+ assertNotNull("WSDL is null", wsdl);
+
+ SOAPService service = new SOAPService();
+ assertNotNull("Service is null", service);
+
+ g = service.getSoapPort();
+ assertNotNull("Port is null", g);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ ((java.io.Closeable)g).close();
+ ep.stop();
+ ep = null;
+ }
+
+ @Test
+ public void testResponseSameBufferSize() throws Exception {
+ updateAddressPort(g, PORT);
+ HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+ c.getClient().setReceiveTimeout(12000);
+ try {
+ g.greetMe(FILL_BUFFER);
+ g.greetMe("Hello");
+ } catch (Exception ex) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ updateAddressPort(g, PORT);
+ HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+ c.getClient().setReceiveTimeout(3000);
+ try {
+ assertEquals("Hello " + request, g.greetMeLater(-5000));
+ fail();
+ } catch (Exception ex) {
+ //expected!!!
+ }
+ }
+
+
+ @Test
+ public void testTimeoutWithPropertySetting() throws Exception {
+ ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
+ "3000");
+ updateAddressPort(g, PORT);
+
+ try {
+ assertEquals("Hello " + request, g.greetMeLater(-5000));
+ fail();
+ } catch (Exception ex) {
+ //expected!!!
+ }
+ }
+
+ @Test
+ public void testTimeoutAsync() throws Exception {
+ updateAddressPort(g, PORT);
+ HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+ c.getClient().setReceiveTimeout(3000);
+ try {
+ Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L);
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ //expected!!!
+ }
+ }
+
+ @Test
+ public void testTimeoutAsyncWithPropertySetting() throws Exception {
+ updateAddressPort(g, PORT);
+ ((javax.xml.ws.BindingProvider)g).getRequestContext().put("javax.xml.ws.client.receiveTimeout",
+ "3000");
+ try {
+ Response<GreetMeLaterResponse> future = g.greetMeLaterAsync(-5000L);
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ //expected!!!
+ }
+ }
+
+ @Test
+ public void testConnectIssue() throws Exception {
+ updateAddressPort(g, PORT_INV);
+ try {
+ g.greetMe(request);
+ fail("should have connect exception");
+ } catch (Exception ex) {
+ //expected
+ }
+ }
+
+ @Test
+ public void testInovationWithHCAddress() throws Exception {
+ String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort";
+ JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+ factory.setServiceClass(Greeter.class);
+ factory.setAddress(address);
+ Greeter greeter = factory.create(Greeter.class);
+ String response = greeter.greetMe("test");
+ assertEquals("Get a wrong response", "Hello test", response);
+ }
+
+ @Test
+ public void testInvocationWithTransportId() throws Exception {
+ String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
+ JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+ factory.setServiceClass(Greeter.class);
+ factory.setAddress(address);
+ factory.setTransportId("http://cxf.apache.org/transports/http/http-client");
+ Greeter greeter = factory.create(Greeter.class);
+ String response = greeter.greetMe("test");
+ assertEquals("Get a wrong response", "Hello test", response);
+ }
+ @Test
+ public void testCall() throws Exception {
+ updateAddressPort(g, PORT);
+ assertEquals("Hello " + request, g.greetMe(request));
+ HTTPConduit c = (HTTPConduit)ClientProxy.getClient(g).getConduit();
+ HTTPClientPolicy cp = new HTTPClientPolicy();
+ cp.setAllowChunking(false);
+ c.setClient(cp);
+ assertEquals("Hello " + request, g.greetMe(request));
+ }
+ @Test
+ public void testCallAsync() throws Exception {
+ updateAddressPort(g, PORT);
+ GreetMeResponse resp = (GreetMeResponse)g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() {
+ public void handleResponse(Response<GreetMeResponse> res) {
+ try {
+ res.get().getResponseType();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+ }).get();
+ assertEquals("Hello " + request, resp.getResponseType());
+
+ g.greetMeLaterAsync(1000, new AsyncHandler<GreetMeLaterResponse>() {
+ public void handleResponse(Response<GreetMeLaterResponse> res) {
+ }
+ }).get();
+ }
+
+ @Test
+ public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception {
+ // This test is especially targeted for RHEL 6.8
+ updateAddressPort(g, PORT_INV);
+ int repeat = 100;
+ final AtomicInteger count = new AtomicInteger(0);
+ for (int i = 0; i < repeat; i++) {
+ try {
+ g.greetMeAsync(request, new AsyncHandler<GreetMeResponse>() {
+ public void handleResponse(Response<GreetMeResponse> res) {
+ count.incrementAndGet();
+ }
+ }).get();
+ } catch (Exception e) {
+ }
+ }
+ Thread.sleep(1000);
+ assertEquals("Callback should be invoked only once per request", repeat, count.intValue());
+ }
+
+ @Test
+ public void testCallAsyncWithFullWorkQueue() throws Exception {
+ Bus bus = BusFactory.getThreadDefaultBus();
+ WorkQueueManager workQueueManager = bus.getExtension(WorkQueueManager.class);
+ AutomaticWorkQueueImpl automaticWorkQueue1 = (AutomaticWorkQueueImpl)workQueueManager.getAutomaticWorkQueue();
+ updateAddressPort(g, PORT);
+
+ Client client = ClientProxy.getClient(g);
+ HTTPConduit http = (HTTPConduit) client.getConduit();
+
+ HTTPClientPolicy httpClientPolicy = new HTTPClientPolicy();
+
+ int asyncExecuteTimeout = 500;
+ httpClientPolicy.setAsyncExecuteTimeout(asyncExecuteTimeout);
+
+ http.setClient(httpClientPolicy);
+
+ long repeat = automaticWorkQueue1.getHighWaterMark() + automaticWorkQueue1.getMaxSize() + 1;
+ CountDownLatch initialThreadsLatch = new CountDownLatch(automaticWorkQueue1.getHighWaterMark());
+ CountDownLatch doneLatch = new CountDownLatch((int) repeat);
+ AtomicInteger threadCount = new AtomicInteger();
+
+ for (long i = 0; i < repeat; i++) {
+ g.greetMeLaterAsync(-50, res -> {
+
+ try {
+ int myCount = threadCount.getAndIncrement();
+
+ if (myCount < automaticWorkQueue1.getHighWaterMark()) {
+ // Sleep long enough so that the workqueue will fill up and then
+ // handleResponseOnWorkqueue will fail for the calls from both
+ // responseReceived and consumeContent
+ Thread.sleep(3L * asyncExecuteTimeout);
+ initialThreadsLatch.countDown();
+ } else {
+ Thread.sleep(50);
+ }
+ initialThreadsLatch.await();
+ doneLatch.countDown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ doneLatch.await(30, TimeUnit.SECONDS);
+
+ assertEquals("All responses should be handled eventually", 0, doneLatch.getCount());
+ }
+}
diff --git a/rt/transports/pom.xml b/rt/transports/pom.xml
index e486fef..305b0f6 100644
--- a/rt/transports/pom.xml
+++ b/rt/transports/pom.xml
@@ -35,6 +35,7 @@
<module>http-jetty</module>
<module>http-undertow</module>
<module>http-hc</module>
+ <module>http-hc5</module>
<module>http-netty/netty-server</module>
<module>http-netty/netty-client</module>
<module>jms</module>
diff --git a/systests/pom.xml b/systests/pom.xml
index eb29fa8..6187e97 100644
--- a/systests/pom.xml
+++ b/systests/pom.xml
@@ -56,5 +56,6 @@
<module>microprofile</module>
<module>spring-boot</module>
<module>transport-netty</module>
+ <module>transport-hc5</module>
</modules>
</project>
diff --git a/systests/transport-hc5/pom.xml b/systests/transport-hc5/pom.xml
new file mode 100644
index 0000000..c236870
--- /dev/null
+++ b/systests/transport-hc5/pom.xml
@@ -0,0 +1,163 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>cxf-parent</artifactId>
+ <groupId>org.apache.cxf</groupId>
+ <version>3.5.0-SNAPSHOT</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cxf.systests</groupId>
+ <artifactId>cxf-systests-transport-hc5</artifactId>
+ <name>Apache CXF Apache HttpClient 5.x Transport System Tests</name>
+ <description>Apache CXF Apache HttpClient 5.x Transport System Tests</description>
+ <url>https://cxf.apache.org</url>
+
+ <properties>
+ <cxf.module.name>org.apache.cxf.systests.transport.hc5</cxf.module.name>
+ </properties>
+
+ <build>
+ <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>src/test/java</directory>
+ <excludes>
+ <exclude>**/*.java</exclude>
+ </excludes>
+ </testResource>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Automatic-Module-Name>${cxf.module.name}.tests</Automatic-Module-Name>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-databinding-jaxb</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-hc5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-jetty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxws</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-rs-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-testutils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-testutils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <classifier>keys</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-features-logging</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>cglib</groupId>
+ <artifactId>cglib-nodep</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Book.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Book.java
new file mode 100644
index 0000000..ffd2de3
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Book.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+
+@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "class")
+@XmlRootElement(name = "Book")
+public class Book {
+ private String name;
+ private long id;
+ private Map<Long, Chapter> chapters = new HashMap<>();
+
+ public Book() {
+ Chapter c1 = new Chapter();
+ c1.setId(1L);
+ c1.setTitle("chapter 1");
+ chapters.put(c1.getId(), c1);
+ Chapter c2 = new Chapter();
+ c2.setId(2L);
+ c2.setTitle("chapter 2");
+ chapters.put(c2.getId(), c2);
+ }
+
+ public Book(String name, long id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ public void setName(String n) {
+ name = n;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setId(long i) {
+ id = i;
+ }
+ public long getId() {
+ return id;
+ }
+
+ @PUT
+ public void cloneState(Book book) {
+ id = book.getId();
+ name = book.getName();
+ }
+
+ @GET
+ public Book retrieveState() {
+ return this;
+ }
+
+ @GET
+ @Path("chapters/{chapterid}/")
+ @Produces("application/xml;charset=ISO-8859-1")
+ public Chapter getChapter(@PathParam("chapterid") long chapterid) {
+ return chapters.get(chapterid);
+ }
+
+ @GET
+ @Path("chapters/acceptencoding/{chapterid}/")
+ @Produces("application/xml")
+ public Chapter getChapterAcceptEncoding(@PathParam("chapterid") long chapterid) {
+ return chapters.get(chapterid);
+ }
+
+ @GET
+ @Path("chapters/badencoding/{chapterid}/")
+ @Produces("application/xml;charset=UTF-48")
+ public Chapter getChapterBadEncoding(@PathParam("chapterid") long chapterid) {
+ return chapters.get(chapterid);
+ }
+
+ @Path("chapters/sub/{chapterid}/")
+ public Chapter getSubChapter(@PathParam("chapterid") long chapterid) {
+ return chapters.get(chapterid);
+ }
+
+ @Path("chaptersobject/sub/{chapterid}/")
+ public Object getSubChapterObject(@PathParam("chapterid") long chapterid) {
+ return getSubChapter(chapterid);
+ }
+
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookServerAsyncClient.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookServerAsyncClient.java
new file mode 100644
index 0000000..0d81281
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookServerAsyncClient.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.MessageBodyWriter;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.cxf.jaxrs.provider.StreamingResponseProvider;
+import org.apache.cxf.testutil.common.AbstractServerTestServerBase;
+
+public class BookServerAsyncClient extends AbstractServerTestServerBase {
+ public static final String PORT = allocatePort(BookServerAsyncClient.class);
+
+ @Override
+ protected Server createServer(Bus bus) throws Exception {
+ JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean();
+ sf.setResourceClasses(BookStore.class);
+ sf.setResourceProvider(BookStore.class,
+ new SingletonResourceProvider(new BookStore(), true));
+ sf.setAddress("http://localhost:" + PORT + "/");
+ sf.setProvider(new BooleanReaderWriter());
+ sf.setProvider(new JacksonJsonProvider());
+ sf.setProvider(new StreamingResponseProvider<Book>());
+ sf.getProperties(true).put("default.content.type", "*/*");
+ return sf.create();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new BookServerAsyncClient().start();
+ }
+
+ @Consumes("text/boolean")
+ @Produces("text/boolean")
+ public static class BooleanReaderWriter implements
+ MessageBodyReader<Object>, MessageBodyWriter<Boolean> {
+
+ @Override
+ public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public Object readFrom(Class<Object> arg0, Type arg1, Annotation[] arg2, MediaType arg3,
+ MultivaluedMap<String, String> arg4, InputStream is) throws IOException,
+ WebApplicationException {
+ return Boolean.valueOf(IOUtils.readStringFromStream(is));
+ }
+
+ @Override
+ public boolean isWriteable(Class<?> type, Type genericType, Annotation[] annotations,
+ MediaType mediaType) {
+ return true;
+ }
+
+ @Override
+ public long getSize(Boolean t, Class<?> type, Type genericType, Annotation[] annotations,
+ MediaType mediaType) {
+ return -1;
+ }
+
+ @Override
+ public void writeTo(Boolean t, Class<?> type, Type genericType, Annotation[] annotations,
+ MediaType mediaType, MultivaluedMap<String, Object> httpHeaders,
+ OutputStream os) throws IOException, WebApplicationException {
+ byte[] bytes = t.toString().getBytes("UTF-8");
+ os.write(bytes);
+
+ }
+
+
+ }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookStore.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookStore.java
new file mode 100644
index 0000000..dc4d583
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/BookStore.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.MatrixParam;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.cxf.annotations.GZIP;
+import org.apache.cxf.jaxrs.ext.MessageContext;
+import org.apache.cxf.jaxrs.ext.Oneway;
+import org.apache.cxf.jaxrs.ext.PATCH;
+import org.apache.cxf.jaxrs.ext.StreamingResponse;
+import org.apache.cxf.jaxrs.impl.MetadataMap;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.PhaseInterceptorChain;
+
+@Path("/bookstore")
+@GZIP(threshold = 1)
+public class BookStore {
+
+ private Map<Long, Book> books = new HashMap<>();
+ private long bookId = 123;
+
+ @Context
+ private UriInfo ui;
+ @Context
+ private MessageContext messageContext;
+
+ public BookStore() {
+ init();
+ }
+
+ @GET
+ @Path("/")
+ public Book getBookRoot() {
+ return new Book("root", 124L);
+ }
+ @PUT
+ @Path("/updatebook/{id}")
+ @Consumes("application/xml")
+ @Produces("application/xml")
+ public Book updateEchoBook(Book book) {
+ if (book.getId() != Long.parseLong(ui.getPathParameters().getFirst("id"))) {
+ throw new WebApplicationException(404);
+ }
+ return new Book("Updated " + book.getName(), book.getId());
+ }
+
+ @GET
+ @Path("/books/wildcard")
+ @Produces("text/*")
+ public String getBookTextWildcard() {
+ return "book";
+ }
+
+ @RETRIEVE
+ @Path("/retrieve")
+ @Produces("application/xml")
+ @Consumes("application/xml")
+ public Book retrieveBook(Book book) {
+ return book;
+ }
+
+ @PATCH
+ @Path("/patch")
+ @Produces("application/xml")
+ @Consumes("application/xml")
+ public Response patchBook(Book book) {
+ if ("Timeout".equals(book.getName())) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+ return Response.ok(book).build();
+ }
+ return Response.ok(book).build();
+ }
+
+ @DELETE
+ @Path("/deletebody")
+ @Produces("application/xml")
+ @Consumes("application/xml")
+ public Book deleteBodyBook(Book book) {
+ return book;
+ }
+
+ @GET
+ @Path("setcookies")
+ public Response setComplexCookies() {
+ return Response.ok().header("Set-Cookie",
+ "bar.com.anoncart=107894933471602436; Domain=.bar.com;"
+ + " Expires=Thu, 01-Oct-2020 23:44:22 GMT; Path=/")
+ .build();
+ }
+
+ @GET
+ @Path("books/check/{id}")
+ @Produces("text/plain,text/boolean")
+ public boolean checkBook(@PathParam("id") Long id) {
+ return books.containsKey(id);
+ }
+
+ @GET
+ @Path("/books/statusFromStream")
+ @Produces("text/xml")
+ public Response statusFromStream() {
+ return Response.ok(new ResponseStreamingOutputImpl()).type("text/plain").build();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @GET
+ @Path("/books/streamingresponse")
+ @Produces("text/xml")
+ public Response getBookStreamingResponse() {
+ return Response.ok(new StreamingResponse() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void writeTo(Writer writer) throws IOException {
+ writer.write(new Book("stream", 124L));
+ }
+
+ }).build();
+ }
+
+ @POST
+ @Path("/oneway")
+ @Oneway
+ public void onewayRequest() {
+ if (!PhaseInterceptorChain.getCurrentMessage().getExchange().isOneWay()) {
+ throw new WebApplicationException();
+ }
+ }
+
+ @POST
+ @Path("/no-content")
+ public void noContent() {
+ }
+
+ @GET
+ @Path("/books/{bookId}/")
+ @Produces("application/xml")
+ public Book getBook(@PathParam("bookId") String id) {
+ return doGetBook(id);
+ }
+
+ @GET
+ @Path("/segment/matrix")
+ public Book getBookByMatrixParams(@MatrixParam("first") String s1,
+ @MatrixParam("second") String s2) throws Exception {
+
+ return doGetBook(s1 + s2);
+ }
+
+ public final String init() {
+ books.clear();
+ bookId = 123;
+
+ Book book = new Book();
+ book.setId(bookId);
+ book.setName("CXF in Action");
+ books.put(book.getId(), book);
+
+ return "OK";
+ }
+
+ private class ResponseStreamingOutputImpl implements StreamingOutput {
+ public void write(OutputStream output) throws IOException, WebApplicationException {
+ if (!"text/plain".equals(BookStore.this.messageContext.get("Content-Type"))) {
+ throw new RuntimeException();
+ }
+ BookStore.this.messageContext.put(Message.RESPONSE_CODE, 503);
+ MultivaluedMap<String, String> headers = new MetadataMap<>();
+ headers.putSingle("Content-Type", "text/custom+plain");
+ headers.putSingle("CustomHeader", "CustomValue");
+ BookStore.this.messageContext.put(Message.PROTOCOL_HEADERS, headers);
+
+ output.write("Response is not available".getBytes());
+ }
+ }
+
+ private Book doGetBook(String id) {
+ Book book = books.get(Long.parseLong(id));
+ if (book != null) {
+ return book;
+ }
+
+ throw new NotFoundException(Response
+ .status(Status.NOT_FOUND)
+ .entity("The book with ID '" + id + "' was not found")
+ .build());
+ }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Chapter.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Chapter.java
new file mode 100644
index 0000000..a10c33e
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/Chapter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "Chapter")
+public class Chapter {
+ private String title;
+ private long id;
+
+ public Chapter() {
+ }
+
+ public void setTitle(String n) {
+ title = n;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setId(long i) {
+ id = i;
+ }
+ public long getId() {
+ return id;
+ }
+
+ @GET
+ @Path("/recurse")
+ @Produces("application/xml")
+ public Chapter getItself() {
+ return this;
+ }
+
+ @Path("/recurse2")
+ public Chapter getItself2() {
+ return this;
+ }
+
+ @GET
+ @Produces("application/xml;charset=ISO-8859-1")
+ public Chapter get() {
+ return this;
+ }
+
+ @GET
+ @Path("/ids")
+ @Produces("application/xml;charset=ISO-8859-1")
+ public Chapter getWithBookId(@PathParam("bookId") int bookId,
+ @PathParam("chapterid") int chapterId) {
+ if (bookId != 123 || chapterId != 1) {
+ throw new RuntimeException();
+ }
+ return this;
+ }
+
+
+ @GET
+ @Path("/matched-resources")
+ @Produces("text/plain")
+ public String getMatchedResources(@Context UriInfo ui) {
+ List<String> list = new ArrayList<>();
+ for (Object obj : ui.getMatchedResources()) {
+ list.add(obj.toString());
+ }
+ return list.toString();
+ }
+
+ @GET
+ @Path("/matched!uris")
+ @Produces("text/plain")
+ public String getMatchedUris(@Context UriInfo ui,
+ @QueryParam("decode") String decode) {
+ return ui.getMatchedURIs(Boolean.parseBoolean(decode)).toString();
+ }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientTest.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientTest.java
new file mode 100644
index 0000000..fd1cd7d
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientTest.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Priority;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.ResponseProcessingException;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.xml.ws.Holder;
+
+import org.apache.cxf.jaxrs.client.ClientConfiguration;
+import org.apache.cxf.jaxrs.client.JAXRSClientFactory;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.cxf.jaxrs.model.AbstractResourceInfo;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase {
+ public static final String PORT = BookServerAsyncClient.PORT;
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ AbstractResourceInfo.clearAllMaps();
+ assertTrue("server did not launch correctly", launchServer(BookServerAsyncClient.class, true));
+ createStaticBus();
+ }
+
+ @Test
+ public void testRetrieveBookCustomMethodAsyncSync() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/retrieve";
+ WebClient wc = createWebClient(address).type("application/xml").accept("application/xml");
+ Book book = wc.invoke("RETRIEVE", new Book("Retrieve", 123L), Book.class);
+ assertEquals("Retrieve", book.getName());
+ wc.close();
+ }
+
+ @Test
+ public void testPatchBook() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/patch";
+ WebClient wc = createWebClient(address).type("application/xml");
+ Book book = wc.invoke("PATCH", new Book("Patch", 123L), Book.class);
+ assertEquals("Patch", book.getName());
+ wc.close();
+ }
+
+ @Test
+ public void testPatchBookTimeout() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/patch";
+ WebClient wc = WebClient.create(address).type("application/xml");
+ ClientConfiguration clientConfig = WebClient.getConfig(wc);
+ clientConfig.getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, true);
+ HTTPClientPolicy clientPolicy = clientConfig.getHttpConduit().getClient();
+ clientPolicy.setReceiveTimeout(500);
+ clientPolicy.setConnectionTimeout(500);
+ try {
+ Book book = wc.invoke("PATCH", new Book("Timeout", 123L), Book.class);
+ fail("should throw an exception due to timeout, instead got " + book);
+ } catch (javax.ws.rs.ProcessingException e) {
+ //expected!!!
+ }
+ }
+
+ @Test
+ public void testPatchBookInputStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/patch";
+ WebClient wc = createWebClient(address).type("application/xml");
+ Book book = wc.invoke("PATCH",
+ new ByteArrayInputStream(
+ "<Book><name>Patch</name><id>123</id></Book>".getBytes()),
+ Book.class);
+ assertEquals("Patch", book.getName());
+ wc.close();
+ }
+
+ @Test
+ public void testDeleteWithBody() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/deletebody";
+ WebClient wc = createWebClient(address).type("application/xml").accept("application/xml");
+ Book book = wc.invoke("DELETE", new Book("Delete", 123L), Book.class);
+ assertEquals("Delete", book.getName());
+ wc.close();
+ }
+
+ @Test
+ public void testRetrieveBookCustomMethodAsync() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/retrieve";
+ WebClient wc = createWebClient(address).accept("application/xml");
+ Future<Book> book = wc.async().method("RETRIEVE", Entity.xml(new Book("Retrieve", 123L)),
+ Book.class);
+ assertEquals("Retrieve", book.get().getName());
+ wc.close();
+ }
+
+ @Test
+ public void testGetBookAsyncResponseNotFound() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/bookheaders/404";
+ WebClient wc = createWebClient(address);
+ Future<Response> future = wc.async().get(Response.class);
+ assertEquals(404, future.get().getStatus());
+ wc.close();
+ }
+
+ @Test
+ public void testGetBookAsyncUpdate() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/updatebook/124";
+ WebClient wc = createWebClient(address);
+ Future<Response> future = wc.async().put(Entity.xml(new Book("My CXF Book", 124)));
+ final Response response = future.get();
+ assertEquals(200, response.getStatus());
+ final Book book = response.readEntity(Book.class);
+ assertThat(book.getId(), equalTo(124L));
+ assertThat(book.getName(), equalTo("Updated My CXF Book"));
+ wc.close();
+ }
+
+ @Test
+ public void testGetBookAsync404() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/bookheaders/404";
+ WebClient wc = createWebClient(address);
+ Future<Book> future = wc.async().get(Book.class);
+ try {
+ future.get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof NotFoundException);
+ }
+ wc.close();
+ }
+
+ @Test
+ public void testNonExistentHostnameAsync() throws Exception {
+ String address = "http://168.168.168.168/bookstore";
+ List<Object> providers = new ArrayList<>();
+ providers.add(new TestResponseFilter());
+ WebClient wc = createWebClient(address, providers);
+ Future<Book> future = wc.async().get(Book.class);
+ try {
+ future.get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ assertTrue(cause instanceof ProcessingException);
+ assertTrue(ex.getCause().getCause() instanceof IOException);
+ } finally {
+ wc.close();
+ }
+ }
+
+ @Test
+ public void testNonExistentHostnameGet() throws Exception {
+ String address = "http://168.168.168.168/bookstore";
+ Client c = ClientBuilder.newClient();
+ c.register(new TestResponseFilter());
+ WebTarget t1 = c.target(address);
+ Future<Response> future = t1.request().async().get();
+ try {
+ future.get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ Throwable cause = ex.getCause();
+ assertTrue(cause instanceof ProcessingException);
+ assertTrue(ex.getCause().getCause() instanceof IOException);
+ } finally {
+ c.close();
+ }
+ }
+
+ @Test
+ public void testNonExistentHostnamePost() throws Exception {
+ Client client = ClientBuilder.newClient();
+ WebTarget target = client.target("http://168.168.168.168/");
+ Invocation.Builder builder = target.request();
+ Entity<String> entity = Entity.entity("entity", MediaType.WILDCARD_TYPE);
+ Invocation invocation = builder.buildPost(entity);
+ Future<String> future = invocation.submit(
+ new GenericType<String>() {
+ }
+ );
+
+ try {
+ future.get();
+ } catch (Exception ex) {
+ Throwable cause = ex.getCause();
+ assertTrue(cause instanceof ProcessingException);
+ }
+ }
+
+ @Test
+ public void testPostBookProcessingException() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/";
+ List<Object> providers = new ArrayList<>();
+ providers.add(new FaultyBookWriter());
+ WebClient wc = createWebClient(address, providers);
+
+ Future<Book> future = wc.async().post(Entity.xml(new Book()), Book.class);
+ try {
+ future.get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof ProcessingException);
+ }
+ wc.close();
+ }
+
+ @Test
+ public void testGetBookResponseProcessingException() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/books/123";
+ List<Object> providers = new ArrayList<>();
+ providers.add(new FaultyBookReader());
+ WebClient wc = createWebClient(address, providers);
+
+ Future<Book> future = wc.async().get(Book.class);
+ try {
+ future.get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof ResponseProcessingException);
+ }
+ wc.close();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testGenericInvocationCallback() throws Exception {
+ InvocationCallback<?> callback = createGenericInvocationCallback();
+ String address = "http://localhost:" + PORT + "/bookstore/books/check/123";
+ Client client = ClientBuilder
+ .newBuilder()
+ .register(new BookServerAsyncClient.BooleanReaderWriter())
+ .build();
+
+ client
+ .target(address)
+ .request()
+ .accept("text/boolean")
+ .async()
+ .get(callback)
+ .get();
+
+ assertTrue(((GenericInvocationCallback)callback).getResult().readEntity(Boolean.class));
+ }
+
+ @Test
+ public void testAsyncProxyPrimitiveResponse() throws Exception {
+ String address = "http://localhost:" + PORT;
+ final Holder<Boolean> holder = new Holder<>();
+ final InvocationCallback<Boolean> callback = new InvocationCallback<Boolean>() {
+ public void completed(Boolean response) {
+ holder.value = response;
+ }
+ public void failed(Throwable error) {
+ }
+ };
+
+ BookStore store = JAXRSClientFactory.create(address, BookStore.class);
+ WebClient.getConfig(store).getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, true);
+ WebClient.getConfig(store).getRequestContext().put(InvocationCallback.class.getName(), callback);
+
+ store.checkBook(123L);
+ Thread.sleep(3000);
+ assertTrue(holder.value);
+ }
+
+ @Test
+ public void testAsyncProxyBookResponse() throws Exception {
+ String address = "http://localhost:" + PORT;
+ final Holder<Book> holder = new Holder<>();
+ final InvocationCallback<Book> callback = new InvocationCallback<Book>() {
+ public void completed(Book response) {
+ holder.value = response;
+ }
+ public void failed(Throwable error) {
+ }
+ };
+
+ BookStore store = JAXRSClientFactory.create(address, BookStore.class);
+ WebClient.getConfig(store).getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, true);
+ WebClient.getConfig(store).getRequestContext().put(InvocationCallback.class.getName(), callback);
+
+ Book book = store.getBookByMatrixParams("12", "3");
+ assertNull(book);
+ Thread.sleep(3000);
+ assertNotNull(holder.value);
+ assertEquals(123L, holder.value.getId());
+ }
+
+ @Test
+ public void testAsyncProxyMultipleCallbacks() throws Exception {
+ String address = "http://localhost:" + PORT;
+ final Holder<Book> bookHolder = new Holder<>();
+ final InvocationCallback<Book> bookCallback = new InvocationCallback<Book>() {
+ public void completed(Book response) {
+ bookHolder.value = response;
+ }
+ public void failed(Throwable error) {
+ }
+ };
+ final Holder<Boolean> booleanHolder = new Holder<>();
+ final InvocationCallback<Boolean> booleanCallback = new InvocationCallback<Boolean>() {
+ public void completed(Boolean response) {
+ booleanHolder.value = response;
+ }
+ public void failed(Throwable error) {
+ }
+ };
+ List<InvocationCallback<?>> callbacks = new ArrayList<>();
+ callbacks.add(bookCallback);
+ callbacks.add(booleanCallback);
+
+ BookStore store = JAXRSClientFactory.create(address, BookStore.class);
+ WebClient.getConfig(store).getRequestContext().put(InvocationCallback.class.getName(), callbacks);
+ WebClient.getConfig(store).getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, true);
+
+ Book book = store.getBookByMatrixParams("12", "3");
+ assertNull(book);
+ Thread.sleep(3000);
+ assertNotNull(bookHolder.value);
+ assertEquals(123L, bookHolder.value.getId());
+
+ store.checkBook(123L);
+ Thread.sleep(3000);
+ assertTrue(booleanHolder.value);
+ }
+
+ @Test
+ public void testGetBookAsyncNotFoundCallback() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/bookheaders/404";
+ WebClient wc = createWebClient(address);
+ final Holder<Object> holder = new Holder<>();
+ InvocationCallback<Object> callback = createCallback(holder);
+ try {
+ wc.async().get(callback).get();
+ fail("Exception expected");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof NotFoundException);
+ assertTrue(ex.getCause() == holder.value);
+ }
+ wc.close();
+ }
+
+ @Test
+ public void testClientResponseFilter() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/books/wildcard";
+ try (Response response = ClientBuilder.newClient()
+ .register(AddHeaderClientResponseFilter.class)
+ .target(address)
+ .request("text/plain")
+ .async()
+ .get()
+ .get()) {
+ assertEquals(200, response.getStatus());
+ assertEquals("true", response.getHeaderString("X-Done"));
+ }
+ }
+
+ @Test
+ public void testExceptionWhenMultipleClientResponseFilters() {
+ final String address = "http://localhost:" + PORT + "/bookstore/books/wildcard";
+ try (Response response = ClientBuilder.newClient()
+ .register(AddHeaderClientResponseFilter.class)
+ .register(FaultyClientResponseFilter.class)
+ .target(address)
+ .request()
+ .async()
+ .put(null)
+ .get(10, TimeUnit.SECONDS)) {
+ fail("Should not be invoked");
+ } catch (ExecutionException ex) {
+ assertThat(ex.getCause(), is(instanceOf(ResponseProcessingException.class)));
+ } catch (Throwable ex) {
+ fail("Should be handled by ResponseProcessingException block");
+ }
+ }
+
+ @Test
+ public void testExceptionInClientResponseFilter() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/books/wildcard";
+ try (Response response = ClientBuilder.newClient()
+ .register(FaultyClientResponseFilter.class)
+ .target(address)
+ .request("text/plain")
+ .async()
+ .get()
+ .get(10, TimeUnit.SECONDS)) {
+ fail("Should raise ResponseProcessingException");
+ } catch (ExecutionException ex) {
+ assertThat(ex.getCause(), is(instanceOf(ResponseProcessingException.class)));
+ } catch (Throwable ex) {
+ fail("Should be handled by ResponseProcessingException block");
+ }
+ }
+
+ @Test
+ public void testExceptionInClientResponseFilterWhenNotFound() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/notFound";
+ try (Response response = ClientBuilder.newClient()
+ .register(FaultyClientResponseFilter.class)
+ .target(address)
+ .request("text/plain")
+ .async()
+ .put(null)
+ .get(10, TimeUnit.SECONDS)) {
+ fail("Should not be invoked");
+ } catch (ExecutionException ex) {
+ assertThat(ex.getCause(), is(instanceOf(ResponseProcessingException.class)));
+ } catch (Throwable ex) {
+ fail("Should be handled by ResponseProcessingException block");
+ }
+ }
+
+ @Test
+ public void testNotFound() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/notFound";
+ try (Response response = ClientBuilder.newClient()
+ .target(address)
+ .request("text/plain")
+ .async()
+ .put(null)
+ .get(10, TimeUnit.SECONDS)) {
+ assertThat(response.getStatus(), equalTo(404));
+ }
+ }
+
+ @Test
+ public void testStatusAngHeadersFromStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/books/statusFromStream";
+ WebClient wc = createWebClient(address).accept("text/xml");
+ Response r = wc.async().get().get();
+ assertEquals(503, r.getStatus());
+ assertEquals("text/custom+plain", r.getMediaType().toString());
+ assertEquals("CustomValue", r.getHeaderString("CustomHeader"));
+ assertEquals("Response is not available", r.readEntity(String.class));
+ }
+
+ @Test
+ public void testBookAsStream() throws Exception {
+ String address = "http://localhost:" + PORT + "/bookstore/books/streamingresponse";
+ WebClient wc = createWebClient(address).accept("text/xml");
+ Response r = wc.async().get().get();
+ assertEquals(200, r.getStatus());
+ final Book book = r.readEntity(Book.class);
+ assertThat(book.getId(), equalTo(124L));
+ assertThat(book.getName(), equalTo("stream"));
+ }
+
+ @Test
+ public void testSetCookieWebClient() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/setcookies";
+ WebClient client = createWebClient(address);
+ Response r = client.type("*/*").async().get().get();
+ assertEquals(200, r.getStatus());
+ List<Object> cookies = r.getMetadata().get("Set-Cookie");
+ assertNotNull(cookies);
+ assertEquals(1, cookies.size());
+ }
+
+
+ @Test
+ public void testBookNoContent() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/no-content";
+ WebClient client = createWebClient(address);
+ Response r = client.type("*/*").async().post(null).get();
+ assertEquals(204, r.getStatus());
+ assertThat(r.readEntity(String.class), equalTo(""));
+ }
+
+ @Test
+ public void testBookOneway() throws Exception {
+ final String address = "http://localhost:" + PORT + "/bookstore/oneway";
+ WebClient client = createWebClient(address, new TestResponseFilter());
+ Response r = client.type("*/*").async().post(null).get();
+ assertEquals(202, r.getStatus());
+ assertThat(r.getEntity(), is(nullValue()));
+ assertThat(r.getHeaderString("X-Filter"), equalTo("true"));
+ }
+
+ private WebClient createWebClient(String address, Object ... providers) {
+ final WebClient wc = WebClient.create(address, Arrays.asList(providers));
+ WebClient.getConfig(wc).getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, true);
+ return wc;
+ }
+
+ private InvocationCallback<Object> createCallback(final Holder<Object> holder) {
+ return new InvocationCallback<Object>() {
+ public void completed(Object response) {
+ holder.value = response;
+ }
+ public void failed(Throwable error) {
+ holder.value = error;
+ }
+ };
+ }
+
+ @Produces("application/xml")
+ private static class FaultyBookWriter implements MessageBodyWriter<Book> {
+ @Override
+ public long getSize(Book arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4) {
+ return 0;
+ }
+
+ @Override
+ public boolean isWriteable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public void writeTo(Book arg0, Class<?> arg1, Type arg2, Annotation[] arg3, MediaType arg4,
+ MultivaluedMap<String, Object> arg5, OutputStream arg6) throws IOException, WebApplicationException {
+ throw new RuntimeException();
+ }
+ }
+
+ @Consumes("application/xml")
+ private static class FaultyBookReader implements MessageBodyReader<Book> {
+ @Override
+ public boolean isReadable(Class<?> arg0, Type arg1, Annotation[] arg2, MediaType arg3) {
+ return true;
+ }
+
+ @Override
+ public Book readFrom(Class<Book> arg0, Type arg1, Annotation[] arg2, MediaType arg3,
+ MultivaluedMap<String, String> arg4, InputStream arg5) throws IOException, WebApplicationException {
+ throw new RuntimeException();
+ }
+ }
+
+ public static class TestResponseFilter implements ClientResponseFilter {
+ @Override
+ public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext)
+ throws IOException {
+ responseContext.getHeaders().add("X-Filter", "true");
+ }
+ }
+
+ private static class GenericInvocationCallback<T> implements InvocationCallback<T> {
+ private Object result;
+
+ @Override
+ public void completed(final Object o) {
+ result = o;
+ }
+
+ @Override
+ public void failed(final Throwable throwable) {
+ // complete
+ }
+
+ public Response getResult() {
+ return (Response)result;
+ }
+ }
+
+ @Priority(2)
+ public static class AddHeaderClientResponseFilter implements ClientResponseFilter {
+ @Override
+ public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext)
+ throws IOException {
+ responseContext.getHeaders().add("X-Done", "true");
+ }
+ }
+
+ @Priority(1)
+ public static class FaultyClientResponseFilter implements ClientResponseFilter {
+ @Override
+ public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext)
+ throws IOException {
+ throw new IOException("Exception from client response filter");
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private static <T> InvocationCallback<T> createGenericInvocationCallback() {
+ return new GenericInvocationCallback();
+ }
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/RETRIEVE.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/RETRIEVE.java
new file mode 100644
index 0000000..9bc0838
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/RETRIEVE.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.hc5.jaxrs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.ws.rs.HttpMethod;
+
+@Target({ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@HttpMethod("RETRIEVE")
+public @interface RETRIEVE {
+
+}
diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxws/JAXWSAsyncClientTest.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxws/JAXWSAsyncClientTest.java
new file mode 100644
index 0000000..b458714
--- /dev/null
+++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxws/JAXWSAsyncClientTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.systest.hc5.jaxws;
+
+import java.util.concurrent.ExecutionException;
+
+import javax.jws.WebService;
+import javax.xml.ws.Response;
+import javax.xml.ws.soap.SOAPFaultException;
+
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.greeter_control.AbstractGreeterImpl;
+import org.apache.cxf.greeter_control.Greeter;
+import org.apache.cxf.greeter_control.types.GreetMeResponse;
+import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+import org.apache.cxf.transport.http.HTTPConduit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JAXWSAsyncClientTest extends AbstractBusClientServerTestBase {
+ static final String PORT = allocatePort(Server.class);
+
+ public static class Server extends AbstractBusTestServerBase {
+
+ protected void run() {
+ GreeterImpl implementor = new GreeterImpl();
+ String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort";
+ javax.xml.ws.Endpoint.publish(address, implementor);
+ }
+
+ public static void main(String[] args) {
+ try {
+ Server s = new Server();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+
+ @WebService(serviceName = "BasicGreeterService",
+ portName = "GreeterPort",
+ endpointInterface = "org.apache.cxf.greeter_control.Greeter",
+ targetNamespace = "http://cxf.apache.org/greeter_control",
+ wsdlLocation = "testutils/greeter_control.wsdl")
+ public class GreeterImpl extends AbstractGreeterImpl {
+ @Override
+ public String greetMe(String arg) {
+ if ("timeout".equalsIgnoreCase(arg)) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+
+ return super.greetMe(arg);
+ }
+ }
+ }
+
+
+ @BeforeClass
+ public static void startServers() throws Exception {
+ assertTrue("server did not launch correctly", launchServer(Server.class, true));
+ }
+
+ @AfterClass
+ public static void stopServers() throws Exception {
+ stopAllServers();
+ }
+
+ @Test
+ public void testAsyncClient() throws Exception {
+ // setup the feature by using JAXWS front-end API
+ JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+ factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
+ factory.setServiceClass(Greeter.class);
+ Greeter proxy = factory.create(Greeter.class);
+
+ Response<GreetMeResponse> response = proxy.greetMeAsync("cxf");
+ int waitCount = 0;
+ while (!response.isDone() && waitCount < 15) {
+ Thread.sleep(1000);
+ waitCount++;
+ }
+
+ assertTrue("Response still not received.", response.isDone());
+ }
+
+ @Test
+ public void testTimeout() throws Exception {
+ // setup the feature by using JAXWS front-end API
+ JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
+ factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort");
+ factory.setServiceClass(Greeter.class);
+ Greeter proxy = factory.create(Greeter.class);
+
+ HTTPConduit cond = (HTTPConduit)((Client)proxy).getConduit();
+ cond.getClient().setReceiveTimeout(500);
+
+ try {
+ proxy.greetMeAsync("timeout").get();
+ fail("Should have faulted");
+ } catch (SOAPFaultException ex) {
+ fail("should not be a SOAPFaultException");
+ } catch (ExecutionException ex) {
+ //expected
+ assertTrue(ex.getCause().getClass().getName(),
+ ex.getCause() instanceof java.net.ConnectException
+ || ex.getCause() instanceof java.net.SocketTimeoutException);
+ }
+ }
+}