You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/10 20:28:25 UTC
svn commit: r1383023 - in /cxf/sandbox/dkulp_async_clients/http-hc: ./
src/main/java/org/apache/cxf/transport/http/asyncclient/
src/main/java/org/apache/cxf/transport/http/asyncclient/impl/
Author: dkulp
Date: Mon Sep 10 18:28:25 2012
New Revision: 1383023
URL: http://svn.apache.org/viewvc?rev=1383023&view=rev
Log:
Flip to using AsynClient. Allows removal of a lot of code
Removed:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequestExecutor.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpHost.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/LoggingIOSession.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/Wire.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/impl/
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Modified: cxf/sandbox/dkulp_async_clients/http-hc/pom.xml
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/pom.xml?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/pom.xml (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/pom.xml Mon Sep 10 18:28:25 2012
@@ -73,7 +73,7 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
+ <artifactId>slf4j-jdk14</artifactId>
<version>1.6.2</version>
<scope>test</scope>
</dependency>
@@ -82,6 +82,11 @@
<artifactId>httpcore-nio</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ <version>4.0-beta3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-http-jetty</artifactId>
<version>${project.version}</version>
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Mon Sep 10 18:28:25 2012
@@ -24,10 +24,13 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
+import java.net.Proxy;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.GeneralSecurityException;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.ArrayList;
@@ -37,11 +40,16 @@ import java.util.concurrent.Future;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
import org.apache.cxf.Bus;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.configuration.jsse.SSLUtils;
import org.apache.cxf.configuration.jsse.TLSClientParameters;
import org.apache.cxf.helpers.HttpHeaderHelper;
import org.apache.cxf.io.CacheAndWriteOutputStream;
@@ -50,14 +58,22 @@ import org.apache.cxf.message.MessageUti
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.Headers;
import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
+import org.apache.cxf.transport.https.AliasedX509ExtendedKeyManager;
import org.apache.cxf.transport.https.CertificateHostnameVerifier;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.version.Version;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.Header;
+import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.protocol.ClientContext;
import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.conn.params.ConnRouteParams;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.CoreConnectionPNames;
@@ -80,7 +96,10 @@ public class AsyncHTTPConduit extends UR
}
- AsyncHTTPConduitFactory factory;
+ final AsyncHTTPConduitFactory factory;
+ volatile int lastTlsHash = -1;
+ volatile Object sslState;
+ volatile SSLContext sslContext;
public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
AsyncHTTPConduitFactory factory) throws IOException {
@@ -143,6 +162,12 @@ public class AsyncHTTPConduit extends UR
e.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT,
Integer.valueOf((int) csPolicy.getReceiveTimeout()));
+ Proxy p = proxyFactory.createProxy(csPolicy , uri);
+ if (p != null) {
+ InetSocketAddress isa = (InetSocketAddress)p.address();
+ HttpHost proxy = new HttpHost(isa.getHostName(), isa.getPort());
+ ConnRouteParams.setDefaultProxy(e.getParams(), proxy);
+ }
message.put(CXFHttpRequest.class, e);
}
@@ -302,15 +327,32 @@ public class AsyncHTTPConduit extends UR
if (url.getScheme().equals("https") && tlsClientParameters == null) {
tlsClientParameters = new TLSClientParameters();
}
- CXFHttpHost host = new CXFHttpHost(url.getHost(), url.getPort(), url.getScheme(),
- tlsClientParameters, proxyFactory.createProxy(csPolicy , url));
+ BasicHttpContext ctx = new BasicHttpContext();
+ if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
+ && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() != null) {
+ ctx.setAttribute(ClientContext.CREDS_PROVIDER, new CredentialsProvider() {
+ public void setCredentials(AuthScope authscope, Credentials credentials) {
+ }
+ public Credentials getCredentials(AuthScope authscope) {
+ return new UsernamePasswordCredentials(AsyncHTTPConduit.this
+ .proxyAuthorizationPolicy.getUserName(),
+ AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
+ }
+ public void clear() {
+ }
+ });
+ }
+ if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash && sslState != null) {
+ ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
+ }
connectionFuture = factory.getRequester().execute(
- host,
+ AsyncHTTPConduit.this,
+ url,
csPolicy.getConnectionTimeout(),
new CXFHttpAsyncRequestProducer(entity, outbuf),
new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
- new BasicHttpContext(),
+ ctx,
callback);
}
@@ -351,8 +393,8 @@ public class AsyncHTTPConduit extends UR
if (httpResponse == null) {
outbuf.shutdown();
inbuf.shutdown();
- outbuf = null;
- inbuf = null;
+ //outbuf = null;
+ //inbuf = null;
if (exception != null) {
if (exception instanceof IOException) {
@@ -562,11 +604,70 @@ public class AsyncHTTPConduit extends UR
public void setSSLSession(SSLSession sslsession) {
session = sslsession;
synchronized (sessionLock) {
+ sslState = sslsession.getLocalPrincipal();
sessionLock.notifyAll();
}
}
}
+ public synchronized SSLContext getSSLContext() throws GeneralSecurityException {
+ TLSClientParameters tlsClientParameters = getTlsClientParameters();
+ if (tlsClientParameters == null) {
+ tlsClientParameters = new TLSClientParameters();
+ }
+ int hash = tlsClientParameters.hashCode();
+ if (hash == lastTlsHash) {
+ return sslContext;
+ }
+
+ String provider = tlsClientParameters.getJsseProvider();
+
+ String protocol = tlsClientParameters.getSecureSocketProtocol() != null ? tlsClientParameters
+ .getSecureSocketProtocol() : "TLS";
+
+ SSLContext ctx = provider == null ? SSLContext.getInstance(protocol) : SSLContext
+ .getInstance(protocol, provider);
+ ctx.getClientSessionContext().setSessionTimeout(tlsClientParameters.getSslCacheTimeout());
+ KeyManager[] keyManagers = tlsClientParameters.getKeyManagers();
+ if (tlsClientParameters.getCertAlias() != null) {
+ getKeyManagersWithCertAlias(tlsClientParameters, keyManagers);
+ }
+ ctx.init(keyManagers, tlsClientParameters.getTrustManagers(),
+ tlsClientParameters.getSecureRandom());
+
+ sslContext = ctx;
+ lastTlsHash = hash;
+ sslState = null;
+ return ctx;
+ }
+
+ public void initializeSSLEngine(SSLContext sslcontext, SSLEngine sslengine) {
+ TLSClientParameters tlsClientParameters = getTlsClientParameters();
+ if (tlsClientParameters == null) {
+ tlsClientParameters = new TLSClientParameters();
+ }
+ String[] cipherSuites = SSLUtils.getCiphersuites(tlsClientParameters.getCipherSuites(),
+ SSLUtils.getSupportedCipherSuites(sslcontext),
+ tlsClientParameters.getCipherSuitesFilter(), LOG, false);
+ sslengine.setEnabledCipherSuites(cipherSuites);
+ }
+
+ protected static void getKeyManagersWithCertAlias(TLSClientParameters tlsClientParameters,
+ KeyManager[] keyManagers) throws GeneralSecurityException {
+ if (tlsClientParameters.getCertAlias() != null) {
+ for (int idx = 0; idx < keyManagers.length; idx++) {
+ if (keyManagers[idx] instanceof X509KeyManager) {
+ try {
+ keyManagers[idx] = new AliasedX509ExtendedKeyManager(tlsClientParameters.getCertAlias(),
+ (X509KeyManager)keyManagers[idx]);
+ } catch (Exception e) {
+ throw new GeneralSecurityException(e);
+ }
+ }
+ }
+ }
+ }
+
}
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Mon Sep 10 18:28:25 2012
@@ -32,23 +32,31 @@ import org.apache.cxf.service.model.Endp
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.transport.http.HTTPTransportFactory.HTTPConduitFactory;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFConnectionFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponseFactory;
+import org.apache.http.impl.DefaultHttpResponseFactory;
+import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
+import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.ClientAsyncConnection;
+import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.ssl.SSLIOSession;
+import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
-import org.apache.http.protocol.BasicHttpProcessor;
-import org.apache.http.protocol.RequestConnControl;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestExpectContinue;
-import org.apache.http.protocol.RequestTargetHost;
/**
*
@@ -56,7 +64,8 @@ import org.apache.http.protocol.RequestT
@NoJSR250Annotations(unlessNull = "bus")
public class AsyncHTTPConduitFactory implements BusLifeCycleListener, HTTPConduitFactory {
CXFAsyncRequester requester;
- CXFConnectionManager connManager;
+ ConnectingIOReactor ioReactor;
+ PoolingClientAsyncConnectionManager connectionManager;
boolean isShutdown;
public AsyncHTTPConduitFactory() {
@@ -86,14 +95,21 @@ public class AsyncHTTPConduitFactory imp
public void initComplete() {
}
public synchronized void preShutdown() {
- if (connManager != null) {
+ if (ioReactor != null) {
try {
- connManager.shutdown(1000);
+ connectionManager.shutdown();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ try {
+ ioReactor.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
+ connectionManager = null;
+ ioReactor = null;
+ requester = null;
}
- connManager = null;
isShutdown = true;
}
public void postShutdown() {
@@ -111,28 +127,14 @@ public class AsyncHTTPConduitFactory imp
// HTTP parameters for the client
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 * 1024);
- // Create HTTP protocol processing chain
- BasicHttpProcessor httpproc = new BasicHttpProcessor();
- httpproc.addInterceptor(new RequestContent());
- httpproc.addInterceptor(new RequestTargetHost());
- httpproc.addInterceptor(new RequestConnControl());
- httpproc.addInterceptor(new RequestExpectContinue());
-
- // Create client-side HTTP protocol handler
- CXFAsyncRequestExecutor protocolHandler = new CXFAsyncRequestExecutor();
- // Create client-side I/O event dispatch
- CXFConnectionFactory connFactory = new CXFConnectionFactory(params);
- final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,
- connFactory);
+
// Create client-side I/O reactor
IOReactorConfig config = new IOReactorConfig();
config.setTcpNoDelay(true);
- final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
- // Create HTTP connection pool
- connManager = new CXFConnectionManager(ioReactor, connFactory, params);
- connManager.setDefaultMaxPerRoute(1000);
- connManager.setMaxTotal(5000);
+ final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(new HttpAsyncRequestExecutor(),
+ params);
+ ioReactor = new DefaultConnectingIOReactor(config);
// Run the I/O reactor in a separate thread
Thread t = new Thread(new Runnable() {
@@ -152,9 +154,46 @@ public class AsyncHTTPConduitFactory imp
// Start the client thread
t.start();
- requester = new CXFAsyncRequester(connManager, httpproc,
- new DefaultConnectionReuseStrategy(), params,
- connFactory);
+ AsyncSchemeRegistry registry = new AsyncSchemeRegistry();
+ registry.register(new AsyncScheme("http", 80, null));
+ registry.register(new AsyncScheme("https", 443, null));
+
+ connectionManager = new PoolingClientAsyncConnectionManager(ioReactor, registry) {
+ @Override
+ protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
+ final HttpResponseFactory responseFactory = new DefaultHttpResponseFactory();
+ final ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+
+ return new ClientAsyncConnectionFactory() {
+ @Override
+ public ClientAsyncConnection create(String id, IOSession iosession, HttpParams params) {
+ return new DefaultClientAsyncConnection(id, iosession,
+ responseFactory,
+ allocator, params) {
+ @Override
+ protected void onRequestSubmitted(HttpRequest request) {
+ super.onRequestSubmitted(request);
+ if (request instanceof EntityEnclosingRequestWrapper) {
+ request = ((EntityEnclosingRequestWrapper)request).getOriginal();
+ }
+ if (getIOSession() instanceof SSLIOSession) {
+ SSLIOSession sslio = (SSLIOSession)getIOSession();
+ getIOSession().setAttribute(CXFHttpRequest.class.getName(), request);
+ if (getIOSession().getAttribute("cxf.handshake.done") != null) {
+ ((CXFHttpRequest)request).getOutputStream()
+ .setSSLSession(sslio.getSSLSession());
+ }
+ }
+ }
+ };
+ }
+ };
+ }
+
+ };
+ connectionManager.setDefaultMaxPerRoute(1000);
+ connectionManager.setMaxTotal(5000);
+ requester = new CXFAsyncRequester(connectionManager);
}
public CXFAsyncRequester getRequester() throws IOException {
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java?rev=1383023&r1=1383022&r2=1383023&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java Mon Sep 10 18:28:25 2012
@@ -19,52 +19,41 @@
package org.apache.cxf.transport.http.asyncclient;
-import java.io.IOException;
+import java.net.URI;
+import java.security.GeneralSecurityException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFConnectionFactory;
-import org.apache.cxf.transport.http.asyncclient.impl.CXFNIOPoolEntry;
-import org.apache.http.ConnectionClosedException;
-import org.apache.http.ConnectionReuseStrategy;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.http.client.protocol.ClientContext;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.nio.NHttpClientConnection;
-import org.apache.http.nio.protocol.BasicAsyncRequestExecutionHandler;
-import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
+import org.apache.http.nio.conn.ClientAsyncConnectionManager;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
-import org.apache.http.params.HttpParams;
+import org.apache.http.nio.reactor.IOSession;
import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.HttpProcessor;
public class CXFAsyncRequester {
- private static final Logger LOG = LogUtils.getL7dLogger(CXFAsyncRequester.class);
+ private final ClientAsyncConnectionManager caConMan;
- private final CXFConnectionManager connManager;
- private final HttpProcessor httppocessor;
- private final ConnectionReuseStrategy reuseStrategy;
- private final HttpParams params;
-
public CXFAsyncRequester(
- final CXFConnectionManager connManager,
- final HttpProcessor httppocessor,
- final ConnectionReuseStrategy reuseStrategy,
- final HttpParams params,
- CXFConnectionFactory sslConnFactory) {
+ ClientAsyncConnectionManager caConMan) {
super();
- this.connManager = connManager;
- this.httppocessor = httppocessor;
- this.reuseStrategy = reuseStrategy;
- this.params = params;
+ this.caConMan = caConMan;
}
public <T> Future<T> execute(
- final CXFHttpHost target,
+ final AsyncHTTPConduit conduit,
+ final URI uri,
final long connectionTimeout,
final HttpAsyncRequestProducer requestProducer,
final HttpAsyncResponseConsumer<T> responseConsumer,
@@ -80,134 +69,41 @@ public class CXFAsyncRequester {
throw new IllegalArgumentException("HTTP context may not be null");
}
BasicFuture<T> future = new BasicFuture<T>(callback);
- this.connManager.leaseConnection(
- target, null,
- connectionTimeout, TimeUnit.MILLISECONDS,
- new ConnRequestCallback<T>(
- future, requestProducer, responseConsumer, context));
- return future;
- }
-
- class ConnRequestCallback<T> implements FutureCallback<CXFNIOPoolEntry> {
-
- private final BasicFuture<T> requestFuture;
- private final HttpAsyncRequestProducer requestProducer;
- private final HttpAsyncResponseConsumer<T> responseConsumer;
- private final HttpContext context;
-
- ConnRequestCallback(
- final BasicFuture<T> requestFuture,
- final HttpAsyncRequestProducer requestProducer,
- final HttpAsyncResponseConsumer<T> responseConsumer,
- final HttpContext context) {
- super();
- this.requestFuture = requestFuture;
- this.requestProducer = requestProducer;
- this.responseConsumer = responseConsumer;
- this.context = context;
- }
-
- public void completed(final CXFNIOPoolEntry result) {
- if (this.requestFuture.isDone()) {
- connManager.releaseConnection(result, 0, null);
- return;
- }
- NHttpClientConnection conn = result.getConnection();
- BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
- this.requestProducer, this.responseConsumer,
- new RequestExecutionCallback<T>(this.requestFuture, result),
- this.context, httppocessor, reuseStrategy, params);
- conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
- conn.requestOutput();
- if (!conn.isOpen()) {
- handler.failed(new ConnectionClosedException("Connection closed"));
- try {
- handler.close();
- } catch (IOException ex) {
- LOG.log(Level.SEVERE, ex.getMessage(), ex);
- }
- }
- }
-
- public void failed(final Exception ex) {
- try {
- try {
- this.responseConsumer.failed(ex);
- } finally {
- releaseResources();
- }
- } finally {
- this.requestFuture.failed(ex);
- }
- }
-
- public void cancelled() {
- try {
- try {
- this.responseConsumer.cancel();
- } finally {
- releaseResources();
- }
- } finally {
- this.requestFuture.cancel(true);
- }
- }
-
- public void releaseResources() {
- try {
- this.requestProducer.close();
- } catch (IOException ioex) {
- LOG.log(Level.SEVERE, ioex.getMessage(), ioex);
- }
- try {
- this.responseConsumer.close();
- } catch (IOException ioex) {
- LOG.log(Level.SEVERE, ioex.getMessage(), ioex);
- }
- }
-
- }
-
- class RequestExecutionCallback<T> implements FutureCallback<T> {
-
- private final BasicFuture<T> future;
- private final CXFNIOPoolEntry poolEntry;
-
- RequestExecutionCallback(
- final BasicFuture<T> future,
- final CXFNIOPoolEntry poolEntry) {
- super();
- this.future = future;
- this.poolEntry = poolEntry;
- }
-
- public void completed(final T result) {
- try {
- // Keep alive indefinitely
- connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
- } finally {
- this.future.completed(result);
- }
- }
-
- public void failed(final Exception ex) {
- try {
- this.poolEntry.close();
- connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
- } finally {
- this.future.failed(ex);
- }
- }
-
- public void cancelled() {
- try {
- this.poolEntry.close();
- connManager.releaseConnection(this.poolEntry, 0, TimeUnit.MILLISECONDS);
- } finally {
- this.future.cancel(true);
- }
- }
+ final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
+ reg.register(new AsyncScheme("http", 80, null));
+
+ if ("https".equals(uri.getScheme())) {
+ try {
+ final SSLContext sslcontext = conduit.getSSLContext();
+ reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext) {
+ @Override
+ protected void initializeEngine(SSLEngine engine) {
+ conduit.initializeSSLEngine(sslcontext, engine);
+ }
+ @Override
+ protected void verifySession(final IOSession iosession,
+ final SSLSession sslsession) throws SSLException {
+ super.verifySession(iosession, sslsession);
+ iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+ CXFHttpRequest req = (CXFHttpRequest)iosession
+ .removeAttribute(CXFHttpRequest.class.getName());
+ if (req != null) {
+ req.getOutputStream().setSSLSession(sslsession);
+ }
+ }
+ }));
+ } catch (GeneralSecurityException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(caConMan);
+ context.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+ dhac.execute(requestProducer, responseConsumer, context, callback);
+ return future;
}
-
+
+
+
}