You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/31 23:26:43 UTC

svn commit: r1379619 - /cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/

Author: dkulp
Date: Fri Aug 31 21:26:42 2012
New Revision: 1379619

URL: http://svn.apache.org/viewvc?rev=1379619&view=rev
Log:
Start working on getting HTTPs working

Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFSSLConnectionFactory.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Fri Aug 31 21:26:42 2012
@@ -28,13 +28,27 @@ import java.net.MalformedURLException;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.GeneralSecurityException;
+import java.security.Principal;
+import java.security.cert.Certificate;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.common.util.SystemPropertyAction;
+import org.apache.cxf.configuration.jsse.SSLUtils;
+import org.apache.cxf.configuration.jsse.TLSClientParameters;
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
 import org.apache.cxf.message.Message;
@@ -42,11 +56,14 @@ import org.apache.cxf.message.MessageUti
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.Headers;
 import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
+import org.apache.cxf.transport.https.AliasedX509ExtendedKeyManager;
+import org.apache.cxf.transport.https.CertificateHostnameVerifier;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.http.Header;
+import org.apache.http.HttpHost;
 import org.apache.http.HttpResponse;
 import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.entity.BasicHttpEntity;
@@ -97,11 +114,12 @@ public class AsyncHTTPConduit extends UR
         }
         if (o == null) {
             //o = !message.getExchange().isSynchronous();
-            o = "http".equals(s);
+            //o = "http".equals(s);
+            o = Boolean.TRUE;
         }
         if ("https".equals(s)) {
             //https doesn't work yet
-            o = Boolean.FALSE;
+            //o = Boolean.FALSE;
         }
         if (!MessageUtils.isTrue(o)) {
             message.put(USE_ASYNC, Boolean.FALSE);
@@ -114,7 +132,7 @@ public class AsyncHTTPConduit extends UR
         }
 
         message.put(USE_ASYNC, Boolean.TRUE);
-        
+        message.put("http.scheme", uri.getScheme());
         String httpRequestMethod = 
             (String)message.get(Message.HTTP_REQUEST_METHOD);
         if (httpRequestMethod == null) {
@@ -139,7 +157,7 @@ public class AsyncHTTPConduit extends UR
     protected OutputStream createOutputStream(Message message, 
                                               boolean needToCacheRequest, 
                                               boolean isChunking,
-                                              int chunkThreshold) {
+                                              int chunkThreshold) throws IOException {
         if (Boolean.TRUE.equals(message.get(USE_ASYNC))) {
             CXFHttpRequest entity = message.get(CXFHttpRequest.class);
             return new AsyncWrappedOutputStream(message,
@@ -147,7 +165,7 @@ public class AsyncHTTPConduit extends UR
                                                 isChunking,
                                                 chunkThreshold,
                                                 getConduitName(),
-                                                entity.getURI().toString());
+                                                entity.getURI());
             
         }
         return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold);
@@ -167,17 +185,21 @@ public class AsyncHTTPConduit extends UR
         // Objects for the response
         volatile HttpResponse httpResponse;
         volatile Exception exception;
+        volatile SSLSession session;
+
+        private Future<Boolean> connectionFuture;
 
+        private Object sessionLock = new Object();
         
         public AsyncWrappedOutputStream(Message message,
                                         boolean needToCacheRequest, 
                                         boolean isChunking,
                                         int chunkThreshold, 
                                         String conduitName,
-                                        String url) {
+                                        URI uri) {
             super(message, needToCacheRequest, isChunking,
                   chunkThreshold, conduitName,
-                  url);
+                  uri);
             csPolicy = getClient(message);
             entity = message.get(CXFHttpRequest.class);
             basicEntity = (BasicHttpEntity)entity.getEntity();
@@ -224,12 +246,34 @@ public class AsyncHTTPConduit extends UR
         }
 
         protected void handleNoOutput() throws IOException {
-            executeRequest(false);
+            connect(false);
+            outbuf.writeCompleted();
         }
         protected void setupWrappedStream() throws IOException {
-            executeRequest(true);
-        }        
-        protected void executeRequest(boolean output) throws IOException {
+            connect(true);
+            wrappedStream = new OutputStream() {
+                public void write(byte b[], int off, int len) throws IOException {
+                    outbuf.write(b, off, len);
+                }
+                public void write(int b) throws IOException {
+                    outbuf.write(b);
+                }
+                public void close() throws IOException {
+                    outbuf.writeCompleted();
+                }
+            };
+                        
+            // If we need to cache for retransmission, store data in a
+            // CacheAndWriteOutputStream. Otherwise write directly to the output stream.
+            if (cachingForRetransmission) {
+                cachedStream = new CacheAndWriteOutputStream(wrappedStream);
+                wrappedStream = cachedStream;
+            }
+        }
+        protected void connect(boolean output) throws IOException {
+            if (connectionFuture != null) {
+                return;
+            }
                        
             CXFResponseCallback responseCallback = new CXFResponseCallback() {
                 @Override
@@ -259,43 +303,19 @@ public class AsyncHTTPConduit extends UR
             //FIXME - what to do with a Proxy?
             //Proxy proxy = proxyFactory.createProxy(csPolicy , entity.getURI());
             
-            //FIXME - what to do for SSL/TLS?
-            //tlsClientParameters.*
-            
             
             if (!output) {
                 entity.removeHeaders("Transfer-Encoding");
                 entity.removeHeaders("Content-Type");
                 entity.setEntity(null);
             }
-            factory.getRequester().execute(
+            connectionFuture = factory.getRequester().execute(
                         csPolicy,
+                        this,
                         new CXFHttpAsyncRequestProducer(entity, outbuf),
                         new CXFHttpAsyncResponseConsumer(inbuf, responseCallback),
                         new BasicHttpContext(),
                         callback);
-            if (output) {
-                wrappedStream = new OutputStream() {
-                    public void write(byte b[], int off, int len) throws IOException {
-                        outbuf.write(b, off, len);
-                    }
-                    public void write(int b) throws IOException {
-                        outbuf.write(b);
-                    }
-                    public void close() throws IOException {
-                        outbuf.writeCompleted();
-                    }
-                };
-                            
-                // If we need to cache for retransmission, store data in a
-                // CacheAndWriteOutputStream. Otherwise write directly to the output stream.
-                if (cachingForRetransmission) {
-                    cachedStream = new CacheAndWriteOutputStream(wrappedStream);
-                    wrappedStream = cachedStream;
-                }
-            } else {
-                outbuf.writeCompleted();
-            }
         }
 
         protected synchronized void setHttpResponse(HttpResponse r) {
@@ -391,8 +411,46 @@ public class AsyncHTTPConduit extends UR
         }
         
         protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException {
-            //FIXME - need to get the TLS stuff from the connection
-            return null;
+            if ("http".equals(outMessage.get("http.scheme"))) {
+                return null;
+            }
+            connect(true);
+            synchronized (sessionLock) {
+                if (session == null) {
+                    try {
+                        sessionLock.wait(csPolicy.getConnectionTimeout());
+                    } catch (InterruptedException e) {
+                        throw new IOException(e);
+                    }
+                }
+            }
+            HostnameVerifier verifier;
+            if (tlsClientParameters.isUseHttpsURLConnectionDefaultHostnameVerifier()) {
+                verifier = HttpsURLConnection.getDefaultHostnameVerifier();
+            } else if (tlsClientParameters.isDisableCNCheck()) {
+                verifier = CertificateHostnameVerifier.ALLOW_ALL;
+            } else {
+                verifier = CertificateHostnameVerifier.DEFAULT;
+            }
+            if (!verifier.verify(url.getHost(), session)) {
+                throw new IOException("Could not verify host " + url.getHost());
+            }
+            
+            String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
+            String cipherSuite = null;
+            Certificate[] localCerts = null;
+            Principal principal = null;
+            Certificate[] serverCerts = null;
+            Principal peer = null;
+            if (session != null) {
+                cipherSuite = session.getCipherSuite();
+                localCerts = session.getLocalCertificates();
+                principal = session.getLocalPrincipal();
+                serverCerts = session.getPeerCertificates();
+                peer = session.getPeerPrincipal();
+            }
+            
+            return new HttpsURLConnectionInfo(url, method, cipherSuite, localCerts, principal, serverCerts, peer);
         }
         
         protected synchronized int getResponseCode() throws IOException {
@@ -481,6 +539,7 @@ public class AsyncHTTPConduit extends UR
             httpResponse = null;
             isAsync = false;
             exception = null;
+            connectionFuture = null;
             
             try {
                 setupConnection(outMessage, new URI(newURL), csPolicy);
@@ -495,6 +554,57 @@ public class AsyncHTTPConduit extends UR
             outbuf = new SharedOutputBuffer(4096 * 4, allocator);
         }
         
-    }
+        public SSLContext getSSLContext() throws GeneralSecurityException {
+            if (tlsClientParameters == null) {
+                tlsClientParameters = new TLSClientParameters();
+            }
+            String provider = tlsClientParameters.getJsseProvider();
+    
+            String protocol = tlsClientParameters.getSecureSocketProtocol() != null ? tlsClientParameters
+                .getSecureSocketProtocol() : "TLS";
+    
+            SSLContext ctx = provider == null ? SSLContext.getInstance(protocol) : SSLContext
+                .getInstance(protocol, provider);
+            ctx.getClientSessionContext().setSessionTimeout(tlsClientParameters.getSslCacheTimeout());
+            KeyManager[] keyManagers = tlsClientParameters.getKeyManagers();
+            if (tlsClientParameters.getCertAlias() != null) {
+                getKeyManagersWithCertAlias(tlsClientParameters, keyManagers);
+            }
+            ctx.init(keyManagers, tlsClientParameters.getTrustManagers(),
+                     tlsClientParameters.getSecureRandom());
     
+            return ctx;
+        }
+        public void initializeSSLEngine(SSLContext ctx, SSLEngine sslengine) {
+            String[] cipherSuites = SSLUtils.getCiphersuites(tlsClientParameters.getCipherSuites(),
+                                                             SSLUtils.getSupportedCipherSuites(ctx), 
+                                                             tlsClientParameters.getCipherSuitesFilter(), LOG, false);
+            sslengine.setEnabledCipherSuites(cipherSuites);
+        }
+        protected void getKeyManagersWithCertAlias(TLSClientParameters tlsClientParameters,
+                                                   KeyManager[] keyManagers) throws GeneralSecurityException {
+            if (tlsClientParameters.getCertAlias() != null) {
+                for (int idx = 0; idx < keyManagers.length; idx++) {
+                    if (keyManagers[idx] instanceof X509KeyManager) {
+                        try {
+                            keyManagers[idx] = new AliasedX509ExtendedKeyManager(
+                                tlsClientParameters.getCertAlias(), (X509KeyManager)keyManagers[idx]);
+                        } catch (Exception e) {
+                            throw new GeneralSecurityException(e);
+                        }
+                    }
+                }
+            }
+        }
+    
+        public void verify(HttpHost host, SSLSession sslsession) {            
+            session = sslsession;
+            synchronized (sessionLock) {
+                sessionLock.notifyAll();
+            }
+        }
+    }
+
+
+
 }

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Fri Aug 31 21:26:42 2012
@@ -156,7 +156,8 @@ public class AsyncHTTPConduitFactory imp
         t.start();
         
         requester = new CXFAsyncRequester(connManager, httpproc, 
-                new DefaultConnectionReuseStrategy(), params);
+                new DefaultConnectionReuseStrategy(), params,
+                sslConnFactory);
     }
     
     public CXFAsyncRequester getRequester() throws IOException {

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFAsyncRequester.java Fri Aug 31 21:26:42 2012
@@ -20,9 +20,13 @@
 package org.apache.cxf.transport.http.asyncclient;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit.AsyncWrappedOutputStream;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.http.ConnectionClosedException;
 import org.apache.http.ConnectionReuseStrategy;
@@ -44,7 +48,11 @@ import org.slf4j.LoggerFactory;
 public class CXFAsyncRequester {
 
     private static final Logger LOG = LoggerFactory.getLogger(CXFAsyncRequester.class);
+
     
+    private final Map<HttpHost, AsyncWrappedOutputStream> outstandingConnections 
+        = Collections.synchronizedMap(new IdentityHashMap<HttpHost, AsyncWrappedOutputStream>());
+
     private final CXFConnectionManager connManager;
     private final HttpProcessor httppocessor;
     private final ConnectionReuseStrategy reuseStrategy;
@@ -54,16 +62,19 @@ public class CXFAsyncRequester {
             final CXFConnectionManager connManager,
             final HttpProcessor httppocessor,
             final ConnectionReuseStrategy reuseStrategy,
-            final HttpParams params) {
+            final HttpParams params,
+            CXFSSLConnectionFactory sslConnFactory) {
         super();
         this.connManager = connManager;
         this.httppocessor = httppocessor;
         this.reuseStrategy = reuseStrategy;
         this.params = params;
+        sslConnFactory.setOutstandingConnectionMap(outstandingConnections);
     }
 
     public <T> Future<T> execute(
             final HTTPClientPolicy csPolicy,
+            final AsyncWrappedOutputStream asyncWrappedOutputStream,
             final HttpAsyncRequestProducer requestProducer,
             final HttpAsyncResponseConsumer<T> responseConsumer,
             final HttpContext context,
@@ -79,8 +90,13 @@ public class CXFAsyncRequester {
         }
         BasicFuture<T> future = new BasicFuture<T>(callback);
         HttpHost target = requestProducer.getTarget();
+        Object tlsCompat = null;
+        if ("https".equals(target.getSchemeName())) {
+            outstandingConnections.put(target, asyncWrappedOutputStream);
+            tlsCompat = new Object();
+        }
         this.connManager.leaseConnection(
-                target, null, 
+                target, tlsCompat, 
                 csPolicy.getConnectionTimeout(), TimeUnit.MILLISECONDS,
                 new ConnRequestCallback<T>(
                 future, requestProducer, responseConsumer, context));

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnection.java Fri Aug 31 21:26:42 2012
@@ -46,17 +46,24 @@ class CXFConnection extends DefaultNHttp
 
     private final String id;
 
+    private IOSession iosession;
+
     public CXFConnection(
             final IOSession session,
             final HttpResponseFactory responseFactory,
             final ByteBufferAllocator allocator,
             final HttpParams params) {
         super(session, responseFactory, allocator, params);
+        iosession = session;
         this.id = "http-outgoing-" + COUNT.incrementAndGet();
         if (IOLOG.isDebugEnabled() || WIRELOG.isDebugEnabled()) {
             this.session = new LoggingIOSession(session, this.id);
         }
     }
+    
+    public IOSession getIOSession() {
+        return iosession;
+    }
 
     @Override
     public void close() throws IOException {

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFConnectionManager.java Fri Aug 31 21:26:42 2012
@@ -32,6 +32,7 @@ import org.apache.http.nio.NHttpClientCo
 import org.apache.http.nio.pool.NIOConnFactory;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
 import org.apache.http.nio.reactor.IOReactorStatus;
+import org.apache.http.nio.reactor.ssl.SSLIOSession;
 import org.apache.http.params.HttpParams;
 import org.apache.http.pool.PoolStats;
 import org.slf4j.Logger;
@@ -50,7 +51,18 @@ class CXFConnectionManager {
             final HttpParams params) {
         super();
         this.ioreactor = ioreactor;
-        this.pool = new BasicNIOConnPool(ioreactor, connFactory, params);
+        this.pool = new BasicNIOConnPool(ioreactor, connFactory, params) {
+            protected BasicNIOPoolEntry createEntry(HttpHost host, NHttpClientConnection conn) {
+                BasicNIOPoolEntry e = super.createEntry(host, conn);
+                CXFConnection cc = (CXFConnection)conn;
+                if (cc.getIOSession() instanceof SSLIOSession) {
+                    //Need a way to do comparisons on the TLS stuff
+                    //and also save the SSLEngine and such for use later
+                    e.setState(new Object());
+                }
+                return e;
+            }
+        };
     }
 
     @Override

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFSSLConnectionFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFSSLConnectionFactory.java?rev=1379619&r1=1379618&r2=1379619&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFSSLConnectionFactory.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFSSLConnectionFactory.java Fri Aug 31 21:26:42 2012
@@ -19,47 +19,79 @@
 
 package org.apache.cxf.transport.http.asyncclient;
 
+import java.security.GeneralSecurityException;
+import java.util.Map;
+
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
+
 
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit.AsyncWrappedOutputStream;
+import org.apache.http.HttpHost;
 import org.apache.http.HttpResponseFactory;
 import org.apache.http.annotation.Immutable;
+import org.apache.http.impl.DefaultHttpResponseFactory;
 import org.apache.http.impl.nio.DefaultNHttpClientConnection;
-import org.apache.http.impl.nio.SSLNHttpClientConnectionFactory;
+import org.apache.http.nio.NHttpConnectionFactory;
 import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.ssl.SSLIOSession;
+import org.apache.http.nio.reactor.ssl.SSLMode;
 import org.apache.http.nio.reactor.ssl.SSLSetupHandler;
 import org.apache.http.nio.util.ByteBufferAllocator;
+import org.apache.http.nio.util.HeapByteBufferAllocator;
+import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
 
 @Immutable
-class CXFSSLConnectionFactory extends SSLNHttpClientConnectionFactory {
+class CXFSSLConnectionFactory
+    implements NHttpConnectionFactory<DefaultNHttpClientConnection> {
+    
+    private Map<HttpHost, AsyncWrappedOutputStream> outstandingConnections;
 
-    public CXFSSLConnectionFactory(
-            final SSLContext sslcontext,
-            final SSLSetupHandler sslHandler,
-            final HttpResponseFactory responseFactory,
-            final ByteBufferAllocator allocator,
-            final HttpParams params) {
-        super(sslcontext, sslHandler, responseFactory, allocator, params);
-    }
+    private final HttpResponseFactory responseFactory = new DefaultHttpResponseFactory();
+    private final ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+    private final HttpParams params;
 
-    public CXFSSLConnectionFactory(
-            final SSLContext sslcontext,
-            final SSLSetupHandler sslHandler,
-            final HttpParams params) {
-        super(sslcontext, sslHandler, params);
+    public CXFSSLConnectionFactory(final HttpParams params) {
+        this.params = params;
     }
 
-    public CXFSSLConnectionFactory(final HttpParams params) {
-        super(params);
+    public synchronized DefaultNHttpClientConnection createConnection(IOSession session) {
+        final HttpHost host = (HttpHost)session.getAttribute("http.session.attachment");
+        final AsyncWrappedOutputStream stream = outstandingConnections.remove(host);
+        if (stream != null) {
+            //OK
+            try {
+                final SSLContext sslcontext = stream.getSSLContext();
+                SSLIOSession ssliosession 
+                    = new SSLIOSession(session, SSLMode.CLIENT, sslcontext, new SSLSetupHandler() {
+                        public void initalize(SSLEngine sslengine) throws SSLException {
+                            stream.initializeSSLEngine(sslcontext, sslengine);
+                            System.out.println("Initialized!!!!");
+                        }
+                        public void verify(IOSession iosession, SSLSession sslsession) throws SSLException {
+                            System.out.println("will verify!!!!");
+                            stream.verify(host, sslsession);
+                            System.out.println("done verify!!!!");
+                        }
+                    });
+                session.setAttribute(SSLIOSession.SESSION_KEY, ssliosession);
+                session = ssliosession;
+            } catch (GeneralSecurityException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        CXFConnection conn = new CXFConnection(session, responseFactory, allocator, params);
+        int timeout = HttpConnectionParams.getSoTimeout(this.params);
+        conn.setSocketTimeout(timeout);
+        return conn;
     }
 
-    @Override
-    protected DefaultNHttpClientConnection createConnection(
-            final IOSession session,
-            final HttpResponseFactory responseFactory, 
-            final ByteBufferAllocator allocator, 
-            final HttpParams params) {
-        return new CXFConnection(session, responseFactory, allocator, params);
+    
+    public void setOutstandingConnectionMap(Map<HttpHost, AsyncWrappedOutputStream> map) {
+        this.outstandingConnections = map;
     }
     
 }