You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/28 19:10:48 UTC
svn commit: r1391557 - in
/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient:
AsyncHTTPConduit.java AsyncHTTPConduitFactory.java CXFAsyncRequester.java
Author: dkulp
Date: Fri Sep 28 17:10:48 2012
New Revision: 1391557
URL: http://svn.apache.org/viewvc?rev=1391557&view=rev
Log:
[CXF-4525] Refactor the handling of the DefaultHttpAsyncClient to create one per conduit, make it accessible to applications, etc...
Hook up the connection timeout as well.
Removed:
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Modified:
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1391557&r1=1391556&r2=1391557&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Fri Sep 28 17:10:48 2012
@@ -45,6 +45,7 @@ import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.X509KeyManager;
@@ -76,9 +77,15 @@ import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.protocol.ClientContext;
+import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.conn.params.ConnRouteParams;
import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
+import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.protocol.BasicHttpContext;
@@ -93,6 +100,7 @@ public class AsyncHTTPConduit extends UR
volatile int lastTlsHash = -1;
volatile Object sslState;
volatile SSLContext sslContext;
+ volatile DefaultHttpAsyncClient client;
public AsyncHTTPConduit(Bus b,
EndpointInfo ei,
@@ -102,6 +110,16 @@ public class AsyncHTTPConduit extends UR
this.factory = factory;
}
+ public synchronized DefaultHttpAsyncClient getHttpAsyncClient() throws IOException {
+ if (client == null) {
+ client = factory.createClient(this);
+ }
+ return client;
+ }
+ public AsyncHTTPConduitFactory getAsyncHTTPConduitFactory() {
+ return factory;
+ }
+
protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws IOException {
if (factory.isShutdown()) {
message.put(USE_ASYNC, Boolean.FALSE);
@@ -407,16 +425,42 @@ public class AsyncHTTPConduit extends UR
if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash && sslState != null) {
ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
}
- connectionFuture = factory.getRequester().execute(
- AsyncHTTPConduit.this,
- url,
- csPolicy.getConnectionTimeout(),
- new CXFHttpAsyncRequestProducer(entity, outbuf),
- new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
- ctx,
- callback);
+
+ final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
+ reg.register(new AsyncScheme("http", 80, null));
+ if ("https".equals(url.getScheme())) {
+ try {
+ final SSLContext sslcontext = getSSLContext();
+ reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext) {
+ @Override
+ protected void initializeEngine(SSLEngine engine) {
+ initializeSSLEngine(sslcontext, engine);
+ }
+ @Override
+ protected void verifySession(final IOSession iosession,
+ final SSLSession sslsession) throws SSLException {
+ super.verifySession(iosession, sslsession);
+ iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+ CXFHttpRequest req = (CXFHttpRequest)iosession
+ .removeAttribute(CXFHttpRequest.class.getName());
+ if (req != null) {
+ req.getOutputStream().setSSLSession(sslsession);
+ }
+ }
+ }));
+ } catch (GeneralSecurityException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ ctx.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+ connectionFuture = new BasicFuture<Boolean>(callback);
+ getHttpAsyncClient().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
+ new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
+ ctx,
+ callback);
}
-
+
protected synchronized void setHttpResponse(HttpResponse r) {
httpResponse = r;
if (isAsync) {
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1391557&r1=1391556&r2=1391557&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Fri Sep 28 17:10:48 2012
@@ -37,10 +37,23 @@ import org.apache.cxf.transport.http.HTT
import org.apache.cxf.transport.http.HTTPTransportFactory;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpVersion;
+import org.apache.http.ProtocolException;
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.RequestAuthCache;
+import org.apache.http.client.protocol.RequestClientConnControl;
+import org.apache.http.client.protocol.RequestDefaultHeaders;
+import org.apache.http.client.protocol.RequestProxyAuthentication;
+import org.apache.http.client.protocol.RequestTargetAuthentication;
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
+import org.apache.http.impl.client.ProxyAuthenticationStrategy;
+import org.apache.http.impl.client.TargetAuthenticationStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -58,7 +71,16 @@ import org.apache.http.nio.reactor.ssl.S
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+import org.apache.http.params.SyncBasicHttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
/**
*
@@ -91,9 +113,9 @@ public class AsyncHTTPConduitFactory imp
public static enum UseAsyncPolicy {
ALWAYS, ASYNC_ONLY, NEVER
};
+
final IOReactorConfig config = new IOReactorConfig();
- volatile CXFAsyncRequester requester;
volatile ConnectingIOReactor ioReactor;
volatile PoolingClientAsyncConnectionManager connectionManager;
@@ -102,21 +124,49 @@ public class AsyncHTTPConduitFactory imp
int maxConnections = 5000;
int maxPerRoute = 1000;
int connectionTTL = 60000;
+
+ // these have per-instance Logger instances that have sync methods to setup.
+ private final TargetAuthenticationStrategy targetAuthenticationStrategy = new TargetAuthenticationStrategy();
+ private final ProxyAuthenticationStrategy proxyAuthenticationStrategy = new ProxyAuthenticationStrategy();
+ private final BasicHttpProcessor httpproc;
- public AsyncHTTPConduitFactory(Map<String, Object> conf) {
+ AsyncHTTPConduitFactory() {
super();
+ httpproc = new BasicHttpProcessor();
+ httpproc.addInterceptor(new RequestDefaultHeaders());
+ // Required protocol interceptors
+ httpproc.addInterceptor(new RequestContent());
+ httpproc.addInterceptor(new RequestTargetHost());
+ // Recommended protocol interceptors
+ httpproc.addInterceptor(new RequestClientConnControl());
+ httpproc.addInterceptor(new RequestUserAgent());
+ httpproc.addInterceptor(new RequestExpectContinue());
+ // HTTP authentication interceptors
+ httpproc.addInterceptor(new RequestAuthCache());
+ httpproc.addInterceptor(new RequestTargetAuthentication());
+ httpproc.addInterceptor(new RequestProxyAuthentication());
+
+ }
+ public AsyncHTTPConduitFactory(Map<String, Object> conf) {
+ this();
config.setTcpNoDelay(true);
setProperties(conf);
}
public AsyncHTTPConduitFactory(Bus b) {
+ this();
addListener(b);
config.setTcpNoDelay(true);
setProperties(b.getProperties());
}
+
+ public BasicHttpProcessor getDefaultHttpProcessor() {
+ return httpproc;
+ }
+
public UseAsyncPolicy getUseAsyncPolicy() {
return policy;
}
@@ -133,7 +183,6 @@ public class AsyncHTTPConduitFactory imp
shutdown(ioReactor2, connectionManager2);
}
private synchronized void resetVars() {
- requester = null;
ioReactor = null;
connectionManager = null;
}
@@ -248,7 +297,6 @@ public class AsyncHTTPConduitFactory imp
shutdown(ioReactor, connectionManager);
connectionManager = null;
ioReactor = null;
- requester = null;
}
isShutdown = true;
}
@@ -274,7 +322,7 @@ public class AsyncHTTPConduitFactory imp
public synchronized void setupNIOClient() throws IOReactorException {
- if (requester != null) {
+ if (connectionManager != null) {
return;
}
// Create client-side I/O reactor
@@ -341,15 +389,43 @@ public class AsyncHTTPConduitFactory imp
};
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
connectionManager.setMaxTotal(maxConnections);
- requester = new CXFAsyncRequester(connectionManager);
}
- public CXFAsyncRequester getRequester() throws IOException {
- if (requester == null) {
+ public DefaultHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
+ if (connectionManager == null) {
setupNIOClient();
}
-
- return requester;
+
+ DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(connectionManager) {
+ @Override
+ protected HttpParams createHttpParams() {
+ super.createHttpParams();
+ HttpParams params = new SyncBasicHttpParams();
+ HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
+ HttpConnectionParams.setTcpNoDelay(params, true);
+ HttpConnectionParams.setSocketBufferSize(params, 16332);
+ HttpConnectionParams.setConnectionTimeout(params, (int)c.getClient().getConnectionTimeout());
+ return params;
+ }
+ @Override
+ protected BasicHttpProcessor createHttpProcessor() {
+ return httpproc;
+ }
+ };
+ //CXF handles redirects ourselves
+ dhac.setRedirectStrategy(new RedirectStrategy() {
+ public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context)
+ throws ProtocolException {
+ return false;
+ }
+ public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, HttpContext context)
+ throws ProtocolException {
+ return null;
+ }
+ });
+ dhac.setTargetAuthenticationStrategy(targetAuthenticationStrategy);
+ dhac.setProxyAuthenticationStrategy(proxyAuthenticationStrategy);
+ return dhac;
}