You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/09/28 19:10:48 UTC

svn commit: r1391557 - in /cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient: AsyncHTTPConduit.java AsyncHTTPConduitFactory.java CXFAsyncRequester.java

Author: dkulp
Date: Fri Sep 28 17:10:48 2012
New Revision: 1391557

URL: http://svn.apache.org/viewvc?rev=1391557&view=rev
Log:
[CXF-4525] Refactor the handling of the DefaultHttpAsyncClient to create one per conduit, make it accessible to applications, etc...
Hook up the connection timeout as well.

Removed:
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
Modified:
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1391557&r1=1391556&r2=1391557&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Fri Sep 28 17:10:48 2012
@@ -45,6 +45,7 @@ import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLSession;
 import javax.net.ssl.X509KeyManager;
 
@@ -76,9 +77,15 @@ import org.apache.http.auth.Credentials;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.client.protocol.ClientContext;
+import org.apache.http.concurrent.BasicFuture;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.conn.params.ConnRouteParams;
 import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
+import org.apache.http.nio.conn.scheme.AsyncScheme;
+import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
+import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
+import org.apache.http.nio.reactor.IOSession;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.params.CoreConnectionPNames;
 import org.apache.http.protocol.BasicHttpContext;
@@ -93,6 +100,7 @@ public class AsyncHTTPConduit extends UR
     volatile int lastTlsHash = -1;
     volatile Object sslState; 
     volatile SSLContext sslContext;
+    volatile DefaultHttpAsyncClient client;
     
     public AsyncHTTPConduit(Bus b, 
                             EndpointInfo ei, 
@@ -102,6 +110,16 @@ public class AsyncHTTPConduit extends UR
         this.factory = factory;
     }
 
+    public synchronized DefaultHttpAsyncClient getHttpAsyncClient() throws IOException {
+        if (client == null) {
+            client = factory.createClient(this);
+        }
+        return client;
+    }
+    public AsyncHTTPConduitFactory getAsyncHTTPConduitFactory() {
+        return factory;
+    }
+    
     protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws IOException {
         if (factory.isShutdown()) {
             message.put(USE_ASYNC, Boolean.FALSE);
@@ -407,16 +425,42 @@ public class AsyncHTTPConduit extends UR
             if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash && sslState != null) {
                 ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
             }
-            connectionFuture = factory.getRequester().execute(
-                        AsyncHTTPConduit.this,
-                        url,
-                        csPolicy.getConnectionTimeout(),
-                        new CXFHttpAsyncRequestProducer(entity, outbuf),
-                        new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
-                        ctx,
-                        callback);
+            
+            final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
+            reg.register(new AsyncScheme("http", 80, null));
+            if ("https".equals(url.getScheme())) {
+                try {
+                    final SSLContext sslcontext = getSSLContext();
+                    reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext) {
+                        @Override
+                        protected void initializeEngine(SSLEngine engine) {
+                            initializeSSLEngine(sslcontext, engine);
+                        }
+                        @Override
+                        protected void verifySession(final IOSession iosession,
+                                              final SSLSession sslsession) throws SSLException {
+                            super.verifySession(iosession, sslsession);
+                            iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+                            CXFHttpRequest req = (CXFHttpRequest)iosession
+                                .removeAttribute(CXFHttpRequest.class.getName());
+                            if (req != null) {
+                                req.getOutputStream().setSSLSession(sslsession);
+                            }
+                        }
+                    }));
+                } catch (GeneralSecurityException e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+            }
+            ctx.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+            connectionFuture = new BasicFuture<Boolean>(callback);
+            getHttpAsyncClient().execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
+                                         new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
+                                         ctx,
+                                         callback);
         }
-
+        
         protected synchronized void setHttpResponse(HttpResponse r) {
             httpResponse = r;
             if (isAsync) {

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1391557&r1=1391556&r2=1391557&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Fri Sep 28 17:10:48 2012
@@ -37,10 +37,23 @@ import org.apache.cxf.transport.http.HTT
 import org.apache.cxf.transport.http.HTTPTransportFactory;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
 import org.apache.http.HttpResponseFactory;
+import org.apache.http.HttpVersion;
+import org.apache.http.ProtocolException;
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.protocol.RequestAuthCache;
+import org.apache.http.client.protocol.RequestClientConnControl;
+import org.apache.http.client.protocol.RequestDefaultHeaders;
+import org.apache.http.client.protocol.RequestProxyAuthentication;
+import org.apache.http.client.protocol.RequestTargetAuthentication;
 import org.apache.http.impl.DefaultHttpResponseFactory;
 import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
+import org.apache.http.impl.client.ProxyAuthenticationStrategy;
+import org.apache.http.impl.client.TargetAuthenticationStrategy;
 import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
 import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
 import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
 import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
@@ -58,7 +71,16 @@ import org.apache.http.nio.reactor.ssl.S
 import org.apache.http.nio.util.ByteBufferAllocator;
 import org.apache.http.nio.util.HeapByteBufferAllocator;
 import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+import org.apache.http.params.SyncBasicHttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
 
 /**
  * 
@@ -91,9 +113,9 @@ public class AsyncHTTPConduitFactory imp
     public static enum UseAsyncPolicy {
         ALWAYS, ASYNC_ONLY, NEVER
     };
+        
     
     final IOReactorConfig config = new IOReactorConfig();
-    volatile CXFAsyncRequester requester;
     volatile ConnectingIOReactor ioReactor;
     volatile PoolingClientAsyncConnectionManager connectionManager;
     
@@ -102,21 +124,49 @@ public class AsyncHTTPConduitFactory imp
     int maxConnections = 5000;
     int maxPerRoute = 1000;
     int connectionTTL = 60000;
+
     
+    // these have per-instance Logger instances that have sync methods to setup.
+    private final TargetAuthenticationStrategy targetAuthenticationStrategy = new TargetAuthenticationStrategy();
+    private final ProxyAuthenticationStrategy proxyAuthenticationStrategy = new ProxyAuthenticationStrategy();
+    private final BasicHttpProcessor httpproc;
     
-    public AsyncHTTPConduitFactory(Map<String, Object> conf) {
+    AsyncHTTPConduitFactory() {
         super();
+        httpproc = new BasicHttpProcessor();
+        httpproc.addInterceptor(new RequestDefaultHeaders());
+        // Required protocol interceptors
+        httpproc.addInterceptor(new RequestContent());
+        httpproc.addInterceptor(new RequestTargetHost());
+        // Recommended protocol interceptors
+        httpproc.addInterceptor(new RequestClientConnControl());
+        httpproc.addInterceptor(new RequestUserAgent());
+        httpproc.addInterceptor(new RequestExpectContinue());
+        // HTTP authentication interceptors
+        httpproc.addInterceptor(new RequestAuthCache());
+        httpproc.addInterceptor(new RequestTargetAuthentication());
+        httpproc.addInterceptor(new RequestProxyAuthentication());        
+
+    }
+    public AsyncHTTPConduitFactory(Map<String, Object> conf) {
+        this();
         config.setTcpNoDelay(true);
         setProperties(conf);
     }
     
     
     public AsyncHTTPConduitFactory(Bus b) {
+        this();
         addListener(b);
         config.setTcpNoDelay(true);
         setProperties(b.getProperties());
     }
     
+    
+    public BasicHttpProcessor getDefaultHttpProcessor() {
+        return httpproc;
+    }
+    
     public UseAsyncPolicy getUseAsyncPolicy() {
         return policy;
     }
@@ -133,7 +183,6 @@ public class AsyncHTTPConduitFactory imp
         shutdown(ioReactor2, connectionManager2);
     }
     private synchronized void resetVars() {
-        requester = null;
         ioReactor = null;
         connectionManager = null;
     }
@@ -248,7 +297,6 @@ public class AsyncHTTPConduitFactory imp
             shutdown(ioReactor, connectionManager);
             connectionManager = null;
             ioReactor = null;
-            requester = null;
         }
         isShutdown = true;
     }
@@ -274,7 +322,7 @@ public class AsyncHTTPConduitFactory imp
     
     
     public synchronized void setupNIOClient() throws IOReactorException {
-        if (requester != null) {
+        if (connectionManager != null) {
             return;
         }
         // Create client-side I/O reactor
@@ -341,15 +389,43 @@ public class AsyncHTTPConduitFactory imp
         };
         connectionManager.setDefaultMaxPerRoute(maxPerRoute);
         connectionManager.setMaxTotal(maxConnections);
-        requester = new CXFAsyncRequester(connectionManager);
     }
     
-    public CXFAsyncRequester getRequester() throws IOException {
-        if (requester == null) {
+    public DefaultHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
+        if (connectionManager == null) {
             setupNIOClient();
         }
-
-        return requester;
+        
+        DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(connectionManager) {
+            @Override
+            protected HttpParams createHttpParams() {
+                super.createHttpParams();
+                HttpParams params = new SyncBasicHttpParams();
+                HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
+                HttpConnectionParams.setTcpNoDelay(params, true);
+                HttpConnectionParams.setSocketBufferSize(params, 16332);
+                HttpConnectionParams.setConnectionTimeout(params, (int)c.getClient().getConnectionTimeout());
+                return params;
+            }
+            @Override
+            protected BasicHttpProcessor createHttpProcessor() {
+                return httpproc;
+            }            
+        };
+        //CXF handles redirects ourselves
+        dhac.setRedirectStrategy(new RedirectStrategy() {
+            public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context)
+                throws ProtocolException {
+                return false;
+            }
+            public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, HttpContext context)
+                throws ProtocolException {
+                return null;
+            }
+        });
+        dhac.setTargetAuthenticationStrategy(targetAuthenticationStrategy);
+        dhac.setProxyAuthenticationStrategy(proxyAuthenticationStrategy);
+        return dhac;
     }