You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2010/09/10 20:22:06 UTC

svn commit: r995917 - in /cxf/branches/async-client: api/src/main/java/org/apache/cxf/io/ api/src/main/java/org/apache/cxf/transport/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/ rt/transports/http/ rt/transports/http/src/main/java/org/...

Author: dkulp
Date: Fri Sep 10 18:22:05 2010
New Revision: 995917

URL: http://svn.apache.org/viewvc?rev=995917&view=rev
Log:
Some work for using http-client for some async client stuff

Added:
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java   (with props)
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java   (with props)
Modified:
    cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
    cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java
    cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java
    cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
    cxf/branches/async-client/rt/transports/http/pom.xml
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
    cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java
    cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java
    cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java

Modified: cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java (original)
+++ cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java Fri Sep 10 18:22:05 2010
@@ -35,6 +35,10 @@ public abstract class AbstractWrappedOut
     protected AbstractWrappedOutputStream() {
         super();
     }
+    protected AbstractWrappedOutputStream(OutputStream wrapped) {
+        super();
+        wrappedStream = wrapped;
+    }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {

Modified: cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java (original)
+++ cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java Fri Sep 10 18:22:05 2010
@@ -105,6 +105,7 @@ public class TransportURIResolver extend
                     Message message = new MessageImpl();
                     Exchange exch = new ExchangeImpl();
                     message.setExchange(exch);
+                    exch.put(Bus.class, bus);
                     
                     message.put(Message.HTTP_REQUEST_METHOD, "GET");
                     c.setMessageObserver(new MessageObserver() {
@@ -116,11 +117,22 @@ public class TransportURIResolver extend
                                 c.close(message);
                             } catch (IOException e) {
                                 //ignore
+                                message.getExchange().put(Exception.class, e);
+                            } finally {
+                                synchronized (message.getExchange()) {
+                                    message.getExchange().notifyAll();
+                                }
                             }
                         }
                     });
-                    c.prepare(message);
-                    c.close(message);
+                    synchronized (exch) {
+                        c.prepare(message);
+                        c.close(message);
+                        if (exch.get(InputStream.class) == null
+                            && exch.get(Exception.class) == null) {
+                            exch.wait();
+                        }
+                    }
                     InputStream ins = exch.get(InputStream.class);
                     resourceOpened.addElement(ins);
                     InputSource src = new InputSource(ins);
@@ -132,6 +144,7 @@ public class TransportURIResolver extend
                 }
             } catch (Exception e) {
                 //ignore
+                e.printStackTrace();
             }
         }
         if (is == null 

Modified: cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java (original)
+++ cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java Fri Sep 10 18:22:05 2010
@@ -312,7 +312,7 @@ public class AbstractClient implements C
     protected ResponseBuilder setResponseBuilder(HttpURLConnection conn, Exchange exchange) throws Throwable {
         Message inMessage = exchange.getInMessage();
         if (conn == null) {
-            throw new WebApplicationException(); 
+            throw new WebApplicationException(Response.noContent().build()); 
         }
         Integer responseCode = (Integer)exchange.get(Message.RESPONSE_CODE);
         if (responseCode == null) {
@@ -578,6 +578,8 @@ public class AbstractClient implements C
                                     MultivaluedMap<String, String> headers,
                                     URI currentURI) {
         Message m = cfg.getConduitSelector().getEndpoint().getBinding().createMessage();
+        m.put("force.http.url.connection", Boolean.TRUE);
+        
         m.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
         m.put(Message.INBOUND_MESSAGE, Boolean.FALSE);
         

Modified: cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java (original)
+++ cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java Fri Sep 10 18:22:05 2010
@@ -640,6 +640,8 @@ public class WebClient extends AbstractC
             rb.entity(entity);
             
             return rb.build();
+        } catch (WebApplicationException ex) {
+            throw ex;
         } catch (Throwable ex) {
             throw new WebApplicationException(ex);
         }

Modified: cxf/branches/async-client/rt/transports/http/pom.xml
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/pom.xml?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/pom.xml (original)
+++ cxf/branches/async-client/rt/transports/http/pom.xml Fri Sep 10 18:22:05 2010
@@ -68,6 +68,17 @@
             <artifactId>${servlet-api.artifact}</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore-nio</artifactId>
+            <version>4.1-beta2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.1-alpha2</version>
+        </dependency>
     </dependencies>
 
     <build>

Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java Fri Sep 10 18:22:05 2010
@@ -28,6 +28,7 @@ import org.apache.cxf.common.injection.N
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.http.async.AsyncHTTPConduit;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
 @NoJSR250Annotations(unlessNull = "bus")
@@ -60,9 +61,15 @@ public class ClientOnlyHTTPTransportFact
             EndpointInfo endpointInfo,
             EndpointReferenceType target
     ) throws IOException {
+        /*
         HTTPConduit conduit = target == null
             ? new HTTPConduit(bus, endpointInfo)
             : new HTTPConduit(bus, endpointInfo, target);
+            */
+        HTTPConduit conduit = target == null
+            ? new AsyncHTTPConduit(bus, endpointInfo)
+            : new AsyncHTTPConduit(bus, endpointInfo, target);
+        
         // Spring configure the conduit.  
         String address = conduit.getAddress();
         if (address != null && address.indexOf('?') != -1) {

Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java Fri Sep 10 18:22:05 2010
@@ -26,7 +26,7 @@ import java.util.Map;
  * session state.
  *
  */
-class Cookie {
+public class Cookie {
     public static final String DISCARD_ATTRIBUTE = "discard";
     public static final String MAX_AGE_ATTRIBUTE = "max-age";
     public static final String PATH_ATTRIBUTE = "path";

Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Sep 10 18:22:05 2010
@@ -154,6 +154,11 @@ public class HTTPConduit 
     public static final String KEY_HTTP_CONNECTION = "http.connection";
 
     /**
+     * The Logger for this class.
+     */
+    protected static final Logger LOG = LogUtils.getL7dLogger(HTTPConduit.class);
+
+    /**
      * This constant is the Message(Map) key for a list of visited URLs that
      * is used in redirect loop protection.
      */
@@ -165,10 +170,6 @@ public class HTTPConduit 
      */
     private static final String KEY_AUTH_URLS = "AuthURLs";
     
-    /**
-     * The Logger for this class.
-     */
-    private static final Logger LOG = LogUtils.getL7dLogger(HTTPConduit.class);
     
     /**
      * This constant holds the suffix ".http-conduit" that is appended to the 
@@ -189,7 +190,7 @@ public class HTTPConduit 
     /**
      *  This field holds a reference to the CXF bus associated this conduit.
      */
-    private final Bus bus;
+    protected final Bus bus;
 
     /**
      * This field is used for two reasons. First it provides the base name for
@@ -197,16 +198,16 @@ public class HTTPConduit 
      * address information, should it not be supplied in the Message Map, by the 
      * Message.ENDPOINT_ADDRESS property.
      */
-    private final EndpointInfo endpointInfo;
+    protected final EndpointInfo endpointInfo;
     
 
     /**
      * This field holds the "default" URL for this particular conduit, which
      * is created on demand.
      */
-    private URL defaultEndpointURL;
-    private String defaultEndpointURLString;
-    private boolean fromEndpointReferenceType;
+    protected URL defaultEndpointURL;
+    protected String defaultEndpointURLString;
+    protected boolean fromEndpointReferenceType;
 
     // Configurable values
     
@@ -215,37 +216,46 @@ public class HTTPConduit 
      * This field is injected via spring configuration based on the conduit 
      * name.
      */
-    private HTTPClientPolicy clientSidePolicy;
+    protected HTTPClientPolicy clientSidePolicy;
     
     /**
      * This field holds the password authorization configuration.
      * This field is injected via spring configuration based on the conduit 
      * name.
     */
-    private AuthorizationPolicy authorizationPolicy;
+    protected AuthorizationPolicy authorizationPolicy;
     
     /**
      * This field holds the password authorization configuration for the 
      * configured proxy. This field is injected via spring configuration based 
      * on the conduit name.
      */
-    private ProxyAuthorizationPolicy proxyAuthorizationPolicy;
+    protected ProxyAuthorizationPolicy proxyAuthorizationPolicy;
 
     /**
      * This field holds the configuration TLS configuration which
      * is programmatically configured. 
      */
-    private TLSClientParameters tlsClientParameters;
+    protected TLSClientParameters tlsClientParameters;
     
     /**
      * This field contains the MessageTrustDecider.
      */
-    private MessageTrustDecider trustDecider;
+    protected MessageTrustDecider trustDecider;
     
     /**
      * This field contains the HttpAuthSupplier.
      */
-    private HttpAuthSupplier authSupplier;
+    protected HttpAuthSupplier authSupplier;
+
+
+    /**
+     * Variables for holding session state if sessions are supposed to be maintained
+     */
+    protected Map<String, Cookie> sessionCookies = new ConcurrentHashMap<String, Cookie>();
+    protected boolean maintainSession;
+    
+    protected CertConstraints certConstraints;
 
     /**
      * This boolean signfies that that finalizeConfig is called, which is
@@ -254,15 +264,7 @@ public class HTTPConduit 
      * should be handled as such.
      */
     private boolean configFinalized;
-
-    /**
-     * Variables for holding session state if sessions are supposed to be maintained
-     */
-    private Map<String, Cookie> sessionCookies = new ConcurrentHashMap<String, Cookie>();
-    private boolean maintainSession;
     
-    private CertConstraints certConstraints;
-
     /**
      * Constructor
      * 
@@ -733,7 +735,7 @@ public class HTTPConduit 
      * 
      * @throws MalformedURLException
      */
-    private URL setupURL(Message message) throws MalformedURLException {
+    protected URL setupURL(Message message) throws MalformedURLException {
         String result = (String)message.get(Message.ENDPOINT_ADDRESS);
         String pathInfo = (String)message.get(Message.PATH_INFO);
         String queryString = (String)message.get(Message.QUERY_STRING);
@@ -828,7 +830,7 @@ public class HTTPConduit 
      * @param message The outbound message
      * @return The PROTOCOL_HEADERS map
      */
-    private Map<String, List<String>> getSetProtocolHeaders(Message message) {
+    protected Map<String, List<String>> getSetProtocolHeaders(Message message) {
         Map<String, List<String>> headers =
             CastUtils.cast((Map<?, ?>)message.get(Message.PROTOCOL_HEADERS));        
         if (null == headers) {
@@ -882,7 +884,7 @@ public class HTTPConduit 
      * @param level   The Logging Level.
      * @param headers The Message protocol headers.
      */
-    private void logProtocolHeaders(
+    protected void logProtocolHeaders(
         Level   level,
         Message message
     ) {
@@ -946,7 +948,7 @@ public class HTTPConduit 
      * 
      * @param exchange The exchange in question
      */
-    private boolean isOneway(Exchange exchange) {
+    protected final boolean isOneway(Exchange exchange) {
         return exchange != null && exchange.isOneWay();
     }
 
@@ -959,15 +961,16 @@ public class HTTPConduit 
      */
     protected static InputStream getPartialResponse(
         HttpURLConnection connection,
-        int responseCode
+        int responseCode,
+        Map<String, List<String>> headers
     ) throws IOException {
         InputStream in = null;
         if (responseCode == HttpURLConnection.HTTP_ACCEPTED
             || responseCode == HttpURLConnection.HTTP_OK) {
             if (connection.getContentLength() > 0) {
                 in = connection.getInputStream();
-            } else if (hasChunkedResponse(connection) 
-                       || hasEofTerminatedResponse(connection)) {
+            } else if (hasChunkedResponse(headers) 
+                       || hasEofTerminatedResponse(headers)) {
                 // ensure chunked or EOF-terminated response is non-empty
                 in = getNonEmptyContent(connection);        
             }
@@ -977,22 +980,36 @@ public class HTTPConduit 
     
     /**
      * @param connection the given HttpURLConnection
-     * @return true iff the connection has a chunked response pending
+     * @return true if the connection has a chunked response pending
      */
-    private static boolean hasChunkedResponse(HttpURLConnection connection) {
-        return HttpHeaderHelper.CHUNKED.equalsIgnoreCase(
-                   connection.getHeaderField(HttpHeaderHelper.TRANSFER_ENCODING));
+    protected static boolean hasChunkedResponse(Map<String, List<String>> headers) {
+        List<String> s = headers.get(HttpHeaderHelper.TRANSFER_ENCODING);
+        if (s != null) {
+            for (String s2 : s) {
+                if (HttpHeaderHelper.CHUNKED.equalsIgnoreCase(s2)) {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
     
     /**
      * @param connection the given HttpURLConnection
-     * @return true iff the connection has a chunked response pending
+     * @return true if the connection should be closed at EOF
      */
-    private static boolean hasEofTerminatedResponse(
-        HttpURLConnection connection
+    protected static boolean hasEofTerminatedResponse(
+        Map<String, List<String>> headers
     ) {
-        return HttpHeaderHelper.CLOSE.equalsIgnoreCase(
-                   connection.getHeaderField(HttpHeaderHelper.CONNECTION));
+        List<String> s = headers.get(HttpHeaderHelper.CONNECTION);
+        if (s != null) {
+            for (String s2 : s) {
+                if (HttpHeaderHelper.CLOSE.equalsIgnoreCase(s2)) {
+                    return true;
+                }
+            }
+        }
+        return false;
     }
 
     /**
@@ -1058,7 +1075,7 @@ public class HTTPConduit 
      * @param message
      * @param headers
      */
-    private void setHeadersByAuthorizationPolicy(
+    protected void setHeadersByAuthorizationPolicy(
             Message message,
             URL url,
             Map<String, List<String>> headers
@@ -1129,7 +1146,7 @@ public class HTTPConduit 
             }
         }
     }
-    private static List<String> createMutableList(String val) {
+    public static List<String> createMutableList(String val) {
         return new ArrayList<String>(Arrays.asList(new String[] {val}));
     }
     /**
@@ -1199,7 +1216,7 @@ public class HTTPConduit 
      * @param url     The URL the message is going to.
      * @param headers The headers in the outgoing message.
      */
-    private void setHeadersByPolicy(
+    protected void setHeadersByPolicy(
         Message message,
         URL     url,
         Map<String, List<String>> headers
@@ -1454,7 +1471,6 @@ public class HTTPConduit 
             // it is meant to make it to the end. (Too bad that information
             // went to every URL along the way, but that's what the user 
             // wants!
-            // TODO: Make this issue a security release note.
             setHeadersByAuthorizationPolicy(message, url, headers);
             
             connection = retransmit(
@@ -1471,7 +1487,7 @@ public class HTTPConduit 
      * @param message The message where the Set of URLs is stored.
      * @return The modifiable set of URLs that were visited.
      */
-    private Set<String> getSetAuthoriationURLs(Message message) {
+    protected Set<String> getSetAuthoriationURLs(Message message) {
         @SuppressWarnings("unchecked")
         Set<String> authURLs = (Set<String>) message.get(KEY_AUTH_URLS);
         if (authURLs == null) {
@@ -1489,7 +1505,7 @@ public class HTTPConduit 
      * @param message The message where the Set is stored.
      * @return The modifiable set of URLs that were visited.
      */
-    private Set<String> getSetVisitedURLs(Message message) {
+    protected Set<String> getSetVisitedURLs(Message message) {
         @SuppressWarnings("unchecked")
         Set<String> visitedURLs = (Set<String>) message.get(KEY_VISITED_URLS);
         if (visitedURLs == null) {
@@ -1528,7 +1544,7 @@ public class HTTPConduit 
         
         URL currentURL = connection.getURL();
         
-        String realm = extractAuthorizationRealm(connection.getHeaderFields());
+        String realm = extractAuthorizationRealm(connection.getHeaderFields().get("WWW-Authenticate"));
         
         Set<String> authURLs = getSetAuthoriationURLs(message);
         
@@ -1660,10 +1676,9 @@ public class HTTPConduit 
      * @param headers The Http Response Headers
      * @return The realm, or null if it is non-existent.
      */
-    private String extractAuthorizationRealm(
-            Map<String, List<String>> headers
+    protected String extractAuthorizationRealm(
+            List<String> auth
     ) {
-        List<String> auth = headers.get("WWW-Authenticate");
         if (auth != null) {
             for (String a : auth) {
                 int idx = a.indexOf("realm=");
@@ -2111,10 +2126,19 @@ public class HTTPConduit 
             }
 
             
+            Map<String, List<String>> origHeaders = connection.getHeaderFields();
+            Map<String, List<String>> headers = 
+                new HashMap<String, List<String>>();
+            for (String key : connection.getHeaderFields().keySet()) {
+                if (key != null) {
+                    headers.put(HttpHeaderHelper.getHeaderKey(key), 
+                        origHeaders.get(key));
+                }
+            }
 
             InputStream in = null;
             if (isOneway(exchange)) {
-                in = getPartialResponse(connection, responseCode);
+                in = getPartialResponse(connection, responseCode, headers);
                 if (in == null) {
                     // oneway operation or decoupled MEP without 
                     // partial response
@@ -2133,15 +2157,6 @@ public class HTTPConduit 
             
             Message inMessage = new MessageImpl();
             inMessage.setExchange(exchange);
-            Map<String, List<String>> origHeaders = connection.getHeaderFields();
-            Map<String, List<String>> headers = 
-                new HashMap<String, List<String>>();
-            for (String key : connection.getHeaderFields().keySet()) {
-                if (key != null) {
-                    headers.put(HttpHeaderHelper.getHeaderKey(key), 
-                        origHeaders.get(key));
-                }
-            }
             
             inMessage.put(Message.PROTOCOL_HEADERS, headers);
             inMessage.put(Message.RESPONSE_CODE, responseCode);

Added: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java?rev=995917&view=auto
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (added)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java Fri Sep 10 18:22:05 2010
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.async;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.Cookie;
+import org.apache.cxf.transport.http.DigestAuthSupplier;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.https.CertConstraints;
+import org.apache.cxf.transport.https.CertConstraintsInterceptor;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpTrace;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.entity.ProducingNHttpEntity;
+
+/**
+ * 
+ */
+public class AsyncHTTPConduit extends HTTPConduit {
+    
+    public AsyncHTTPConduit(Bus b, EndpointInfo ei) throws IOException {
+        super(b, ei);
+    }
+    public AsyncHTTPConduit(Bus b, EndpointInfo ei, 
+                            EndpointReferenceType t) throws IOException {
+        super(b, ei, t);
+    }
+
+    public void prepare(Message message) throws IOException {
+        message.put(AsyncHTTPConduit.class, this);
+        Map<String, List<String>> headers = getSetProtocolHeaders(message);
+        
+        URL currentURL = setupURL(message);
+        MessageUtils.getContextualBoolean(message, "force.http.url.connection", false);
+        if (MessageUtils.getContextualBoolean(message, "force.http.url.connection", false)
+            || "https".equalsIgnoreCase(currentURL.getProtocol())) {
+            //delegate to the parent for any https things for now
+            super.prepare(message);
+            return;
+        }
+        
+        HTTPClientPolicy csPolicy = getClient(message);
+        message.put(HTTPClientPolicy.class.getName() + ".complete", csPolicy);
+        
+        // If the HTTP_REQUEST_METHOD is not set, the default is "POST".
+        String httpRequestMethod = 
+            (String)message.get(Message.HTTP_REQUEST_METHOD);        
+
+        boolean isChunking = false;
+        int chunkThreshold = 0;
+        boolean needToCacheRequest = false;
+        // We must cache the request if we have basic auth supplier
+        // without preemptive basic auth.
+        if (authSupplier != null) {
+            String auth = authSupplier.getPreemptiveAuthorization(
+                    this, currentURL, message);
+            if (auth == null || authSupplier.requiresRequestCaching()) {
+                needToCacheRequest = true;
+                isChunking = false;
+                LOG.log(Level.FINE,
+                        "Auth Supplier, but no Premeptive User Pass or Digest auth (nonce may be stale)"
+                        + " We must cache request.");
+            }
+            message.put("AUTH_VALUE", auth);
+        }
+        if (csPolicy.isAutoRedirect()) {
+            needToCacheRequest = true;
+            LOG.log(Level.FINE, "AutoRedirect is turned on.");
+        }
+        if (csPolicy.getMaxRetransmits() > 0) {
+            needToCacheRequest = true;
+            LOG.log(Level.FINE, "MaxRetransmits is set > 0.");
+        }
+        // DELETE does not work and empty PUTs cause misleading exceptions
+        // if chunking is enabled
+        // TODO : ensure chunking can be enabled for non-empty PUTs - if requested
+        if ((httpRequestMethod == null || "POST".equals(httpRequestMethod))
+            && csPolicy.isAllowChunking()) {
+            //TODO: The chunking mode be configured or at least some
+            // documented client constant.
+            //use -1 and allow the URL connection to pick a default value
+            isChunking = true;
+            chunkThreshold = csPolicy.getChunkingThreshold();
+            if (chunkThreshold <= 0) {
+                chunkThreshold = 64 * 1024;
+            }
+        }
+
+        //Do we need to maintain a session?
+        maintainSession = Boolean.TRUE.equals((Boolean)message.get(Message.MAINTAIN_SESSION));
+        
+        //If we have any cookies and we are maintaining sessions, then use them        
+        if (maintainSession && sessionCookies.size() > 0) {
+            List<String> cookies = null;
+            for (String s : headers.keySet()) {
+                if (HttpHeaderHelper.COOKIE.equalsIgnoreCase(s)) {
+                    cookies = headers.remove(s);
+                    break;
+                }
+            }
+            if (cookies == null) {
+                cookies = new ArrayList<String>();
+            } else {
+                cookies = new ArrayList<String>(cookies);
+            }
+            headers.put(HttpHeaderHelper.COOKIE, cookies);
+            for (Cookie c : sessionCookies.values()) {
+                cookies.add(c.requestCookieHeader());
+            }
+        }
+
+        
+        if (certConstraints != null) {
+            message.put(CertConstraints.class.getName(), certConstraints);
+            message.getInterceptorChain().add(CertConstraintsInterceptor.INSTANCE);
+        }
+
+        // Set the headers on the message according to configured 
+        // client side policy.
+        setHeadersByPolicy(message, currentURL, headers);
+        
+        WrappedOutputStream wout = new WrappedOutputStream(currentURL,
+                                                           message,
+                                                           isChunking,
+                                                           chunkThreshold,
+                                                           needToCacheRequest);
+        message.setContent(OutputStream.class, wout);
+        message.put(WrappedOutputStream.class, wout);
+    }
+    
+    
+    protected class WrappedOutputStream extends OutputStream 
+        implements ProducingNHttpEntity, ConsumingNHttpEntity {
+        
+        protected Message outMessage;
+        protected URL address;
+        protected String contentType;
+        protected LoadingByteArrayOutputStream outStream; 
+        protected CachedOutputStream requestStream;
+        protected boolean written;
+        protected boolean connected;
+        protected boolean closed;
+        protected HttpResponse response;
+        protected InputStream inStream;
+        private int chunkThreshold;
+        private boolean isChunking;
+        private int contentLength = -1;
+        private int responseCode;
+        private String responseMessage;
+        private int nretransmits;
+
+        protected WrappedOutputStream(
+                URL address,
+                Message m, 
+                boolean isChunking, 
+                int chunkThreshold, 
+                boolean needToCacheRequest
+        ) {
+            this.outMessage = m;
+            this.address = address;
+            this.chunkThreshold = chunkThreshold;
+            this.isChunking = isChunking;
+            if (needToCacheRequest || !isChunking) {
+                requestStream = new CachedOutputStream();
+            }
+            if (isChunking) {
+                outStream = new LoadingByteArrayOutputStream(64 * 1024);
+            }
+        }
+        public void setResponse(HttpResponse r) {
+            response = r;
+        }
+        
+        public void doConnection() throws IOException {
+            connected = true;
+            if (closed) {
+                contentLength = outStream == null ? -1 : outStream.size();
+                isChunking = false;
+            }
+            final String httpRequestMethod = 
+                (String)outMessage.get(Message.HTTP_REQUEST_METHOD);        
+
+            HttpUriRequest request = null;
+            URI uri2;
+            try {
+                uri2 = address.toURI();
+                uri2 = new URI(null, null, uri2.getPath(), uri2.getQuery(), uri2.getFragment());
+            } catch (URISyntaxException e) {
+                throw new IOException(e.getMessage());
+            }
+            
+            final URI uri = uri2;
+            if (httpRequestMethod == null || "POST".equals(httpRequestMethod)) {
+                HttpPost post = new HttpPost(uri);
+                request = post;
+                post.setEntity(this);
+            } else if ("PUT".equals(httpRequestMethod)) {
+                HttpPut post = new HttpPut(uri);
+                request = post;
+                post.setEntity(this);
+            } else if ("GET".equals(httpRequestMethod)) {
+                request = new HttpGet(uri);
+            } else if ("DELETE".equals(httpRequestMethod)) {
+                request = new HttpDelete(uri);
+            } else if ("HEAD".equals(httpRequestMethod)) {
+                request = new HttpHead(uri);
+            } else if ("TRACE".equals(httpRequestMethod)) {
+                request = new HttpTrace(uri);
+            } else if ("OPTIONS".equals(httpRequestMethod)) {
+                request = new HttpOptions(uri);
+            } else {
+                HttpRequestBase r = new HttpRequestBase() {
+                    public String getMethod() {
+                        return httpRequestMethod;
+                    }
+                };
+                r.setURI(uri);
+                request = r;
+            }
+            setURLRequestHeaders(request);
+            
+            HttpClientController.getHttpClientController(outMessage)
+                .execute(AsyncHTTPConduit.this,
+                         address,
+                         request,
+                         outMessage);
+        }    
+        public boolean handleRetransmits(HttpResponse r) {
+            return false;
+        }
+        public synchronized boolean writeTo(ContentEncoder out) throws IOException {
+            if (isChunking || requestStream == null) {
+                if (outStream.size() > 0) {
+                    ByteBuffer buf = ByteBuffer.wrap(outStream.getRawBytes(), 
+                                                     0, outStream.size());
+                    int size = outStream.size();
+                    int i = out.write(buf);
+                    while (i < size) {
+                        int i2 =  out.write(buf);
+                        i += i2;
+                    }
+                    
+                    outStream.reset();
+                    notifyAll();
+                }
+            } else if (requestStream != null) {
+                InputStream in = requestStream.getInputStream();
+                byte bytes[] = new byte[8192];
+                ByteBuffer buffer = ByteBuffer.wrap(bytes);
+                try {
+                    int len = in.read(bytes);
+                    while (len != -1) {
+                        buffer.limit(len);
+                        int i = out.write(buffer);
+                        while (i < len) {
+                            int i2 =  out.write(buffer);
+                            i += i2;
+                        }
+                        buffer.clear();
+                        len = in.read(bytes);
+                    }
+                } finally {
+                    in.close();
+                }
+            }
+            return closed;
+        }
+
+        private void setURLRequestHeaders(HttpUriRequest request) throws IOException {
+            String ct  = (String)outMessage.get(Message.CONTENT_TYPE);
+            String enc = (String)outMessage.get(Message.ENCODING);
+
+            if (null != ct) {
+                if (enc != null 
+                    && ct.indexOf("charset=") == -1
+                    && !ct.toLowerCase().contains("multipart/related")) {
+                    ct = ct + "; charset=" + enc;
+                }
+            } else if (enc != null) {
+                ct = "text/xml; charset=" + enc;
+            } else {
+                ct = "text/xml";
+            }
+            request.setHeader(HttpHeaderHelper.CONTENT_TYPE, ct);
+            contentType = ct; 
+            
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Sending "
+                    + request.getMethod() 
+                    + " Message with Headers to " 
+                               + request.getURI()
+                    + " Conduit :"
+                    + getConduitName()
+                    + "\nContent-Type: " + ct + "\n");
+                logProtocolHeaders(Level.FINE, outMessage);
+            }
+            Map<String, List<String>> headers = getSetProtocolHeaders(outMessage);
+            for (String header : headers.keySet()) {
+                List<String> headerList = headers.get(header);
+                if (HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header)) {
+                    continue;
+                }
+                if (HttpHeaderHelper.COOKIE.equalsIgnoreCase(header)) {
+                    for (String s : headerList) {
+                        request.addHeader(HttpHeaderHelper.COOKIE, s);
+                    }
+                } else {
+                    StringBuilder b = new StringBuilder();
+                    for (int i = 0; i < headerList.size(); i++) {
+                        b.append(headerList.get(i));
+                        if (i + 1 < headerList.size()) {
+                            b.append(',');
+                        }
+                    }
+                    request.addHeader(header, b.toString());
+                }
+            }
+            if (request.getFirstHeader("User-Agent") == null) {
+                request.addHeader("User-Agent", Version.getCompleteVersionString());
+            }
+        }
+
+        private InputStream getNonEmptyContent(HttpResponse resp) {
+            InputStream in = null;
+            try {
+                PushbackInputStream pin = 
+                    new PushbackInputStream(resp.getEntity().getContent());
+                int c = pin.read();
+                if (c != -1) {
+                    pin.unread((byte)c);
+                    in = pin;
+                }
+            } catch (IOException ioe) {
+                //ignore
+            }    
+            return in;
+        }
+        protected InputStream getPartialResponse(HttpResponse r,
+                                                 Map<String, List<String>> headers)
+            throws IOException {
+            
+            InputStream in = null;
+            if (responseCode == HttpURLConnection.HTTP_ACCEPTED
+                || responseCode == HttpURLConnection.HTTP_OK) {
+                int cl = -1;
+                List<String> cls = headers.get("content-length");
+                if (cls != null) {
+                    cl = Integer.parseInt(cls.get(0));
+                }
+                if (cl > 0) {
+                    in = r.getEntity().getContent();
+                } else if (hasChunkedResponse(headers) 
+                           || hasEofTerminatedResponse(headers)) {
+                    // ensure chunked or EOF-terminated response is non-empty
+                    in = getNonEmptyContent(r);        
+                }
+            }
+            return in;
+        }
+        private Map<String, List<String>> mapResponseHeaders() {
+            Map<String, List<String>> headers = 
+                new HashMap<String, List<String>>();
+            for (Header header : response.getAllHeaders()) {
+                String val = header.getValue();
+                List<String> v = headers.get(HttpHeaderHelper.getHeaderKey(header.getName()));
+                if (v == null) {
+                    v = new ArrayList<String>(2);
+                    headers.put(HttpHeaderHelper.getHeaderKey(header.getName()), v);
+                }
+                v.add(val);
+            }
+            return headers;
+        }
+        
+        protected void handleResponseInternal() throws IOException {
+            // Process retransmits until we fall out.
+            responseCode = response.getStatusLine().getStatusCode();
+            responseMessage = response.getStatusLine().getReasonPhrase();
+
+            try {
+                if (handleRetransmits()) {
+                    return;
+                }
+            } catch (IOException ioex) {
+                //need to propogate the exception
+                Exception ex = new Fault(ioex);
+                outMessage.getExchange().put(Exception.class, ex);
+                Message inMessage = new MessageImpl();
+                inMessage.setExchange(outMessage.getExchange());
+                inMessage.setContent(Exception.class, ex);
+                incomingObserver.onMessage(inMessage);
+                response.getEntity().getContent().close();
+                return;
+            }
+            if (requestStream != null) {
+                //don't need to retransmit, make sure it gets cleaned up
+                requestStream.close();
+            }
+            
+            Exchange exchange = outMessage.getExchange();
+            synchronized (this) {
+                notifyAll();
+            }
+            if (outMessage != null && exchange != null) {
+                exchange.put(Message.RESPONSE_CODE, responseCode);
+            }
+            
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Response Code: " 
+                        + responseCode
+                        + " Conduit: " + getConduitName());
+                Header[] headerFields = response.getAllHeaders();
+                if (null != headerFields) {
+                    StringBuilder buf = new StringBuilder();
+                    buf.append("Header fields: ");
+                    buf.append(System.getProperty("line.separator"));
+                    for (Header h : headerFields) {
+                        buf.append("    ");
+                        buf.append(h.getName());
+                        buf.append(": ");
+                        buf.append(h.getValue());
+                        buf.append(System.getProperty("line.separator"));
+                    }
+                    LOG.fine(buf.toString());
+                }
+            }
+            Message inMessage = new MessageImpl();
+            inMessage.setExchange(exchange);
+
+            if (responseCode == HttpURLConnection.HTTP_NOT_FOUND
+                && !MessageUtils.isTrue(outMessage.getContextualProperty(
+                    "org.apache.cxf.http.no_io_exceptions"))) {
+                
+                Exception ex = new Fault(new IOException("HTTP response '" + responseCode + ": " 
+                                + response.getStatusLine().getReasonPhrase() + "'"));
+                exchange.put(Exception.class, ex);
+                inMessage.setContent(Exception.class, ex);
+                incomingObserver.onMessage(inMessage);
+                response.getEntity().getContent().close();
+                return;
+            }
+
+            Map<String, List<String>> headers = mapResponseHeaders();
+            InputStream in = null;
+            if (isOneway(exchange)) {
+                in = getPartialResponse(response, headers);
+                if (in == null) {
+                    // oneway operation or decoupled MEP without 
+                    // partial response
+                    response.getEntity().getContent().close();
+                    return;
+                }
+            } else {
+                //not going to be resending or anything, clear out the stuff in the out message
+                //to free memory
+                /*
+                outMessage.removeContent(OutputStream.class);
+                if (cachingForRetransmission && cachedStream != null) {
+                    cachedStream.close();
+                }
+                cachedStream = null;
+                */
+            }
+            
+
+            
+            inMessage.put(Message.PROTOCOL_HEADERS, headers);
+            inMessage.put(Message.RESPONSE_CODE, responseCode);
+            List<String> ctList = headers.get("content-type");
+            if (ctList !=  null) {
+                String ct = null;
+                for (String s : ctList) {
+                    if (ct == null) {
+                        ct = s;
+                    } else {
+                        ct += "; " + s;
+                    }
+                }
+                inMessage.put(Message.CONTENT_TYPE, ct);
+                String charset = HttpHeaderHelper.findCharset(ct);
+                String normalizedEncoding = HttpHeaderHelper.mapCharset(charset);
+                if (normalizedEncoding == null) {
+                    String m = new org.apache.cxf.common.i18n.Message("INVALID_ENCODING_MSG",
+                                                                       LOG, charset).toString();
+                    LOG.log(Level.WARNING, m);
+                    throw new IOException(m);   
+                } 
+                inMessage.put(Message.ENCODING, normalizedEncoding);            
+            }
+                        
+            if (maintainSession) {
+                List<String> cookies = headers.get("Set-Cookie");
+                Cookie.handleSetCookie(sessionCookies, cookies);
+            }
+            if (responseCode != HttpURLConnection.HTTP_NOT_FOUND
+                && in == null) {
+                in = response.getEntity().getContent();
+            }
+            // if (in == null) : it's perfectly ok for non-soap http services
+            // have no response body : those interceptors which do need it will check anyway        
+            inMessage.setContent(InputStream.class, in);
+            incomingObserver.onMessage(inMessage);
+        }
+
+        private boolean handleRetransmits() throws IOException {
+            if (requestStream != null) {
+                HTTPClientPolicy policy = getClient(outMessage);
+                // Default MaxRetransmits is -1 which means unlimited.
+                int maxRetransmits = (policy == null)
+                                     ? -1
+                                     : policy.getMaxRetransmits();
+                
+                // MaxRetransmits of zero means zero.
+                if (maxRetransmits == 0) {
+                    return false;
+                }
+                if (maxRetransmits != -1 && nretransmits >= maxRetransmits) {
+                    return false;
+                }
+                if (processRetransmit(outMessage)) {
+                    ++nretransmits;
+                    return true;
+                }
+            }
+            return false;
+        }
+        private boolean processRetransmit(Message message) throws IOException {
+
+            if ((message != null) && (message.getExchange() != null)) {
+                message.getExchange().put(Message.RESPONSE_CODE, responseCode);
+            }
+            
+            // Process Redirects first.
+            switch(responseCode) {
+            case HttpURLConnection.HTTP_MOVED_PERM:
+            case HttpURLConnection.HTTP_MOVED_TEMP:
+                return redirectRetransmit();
+            case HttpURLConnection.HTTP_UNAUTHORIZED:
+                return authorizationRetransmit();
+            default:
+                break;
+            }
+            return false;
+        }
+        private boolean authorizationRetransmit() throws IOException {
+            // If we don't have a dynamic supply of user pass, then
+            // we don't retransmit. We just die with a Http 401 response.
+            if (authSupplier == null) {
+                String auth = response.getFirstHeader("WWW-Authenticate").getValue();
+                if (auth.startsWith("Digest ")) {
+                    authSupplier = new DigestAuthSupplier();
+                } else {
+                    return false;
+                }
+            }
+            
+            URL currentURL = address;
+            
+            Header heads[] = response.getHeaders("WWW-Authenticate");
+            List<String> auth = new ArrayList<String>();
+            if (heads != null) {
+                for (Header h : heads) {
+                    auth.add(h.getValue());
+                }
+            }
+            String realm = extractAuthorizationRealm(auth);
+            
+            Set<String> authURLs = getSetAuthoriationURLs(outMessage);
+            
+            // If we have been here (URL & Realm) before for this particular message
+            // retransmit, it means we have already supplied information
+            // which must have been wrong, or we wouldn't be here again.
+            // Otherwise, the server may be 401 looping us around the realms.
+            if (authURLs.contains(currentURL.toString() + realm)) {
+                if (LOG.isLoggable(Level.INFO)) {
+                    LOG.log(Level.INFO, "Authorization loop detected on Conduit \""
+                        + getConduitName()
+                        + "\" on URL \""
+                        + "\" with realm \""
+                        + realm
+                        + "\"");
+                }
+                        
+                throw new IOException("Authorization loop detected on Conduit \"" 
+                                      + getConduitName() 
+                                      + "\" on URL \""
+                                      + "\" with realm \""
+                                      + realm
+                                      + "\"");
+            }
+            
+            String up = 
+                authSupplier.getAuthorizationForRealm(
+                    AsyncHTTPConduit.this, currentURL, outMessage,
+                    realm, response.getFirstHeader("WWW-Authenticate").getValue());
+            
+            // No user pass combination. We give up.
+            if (up == null) {
+                return false;
+            }
+            
+            // Register that we have been here before we go.
+            authURLs.add(currentURL.toString() + realm);
+            
+            Map<String, List<String>> headers = getSetProtocolHeaders(outMessage);
+            headers.put("Authorization",
+                        createMutableList(up));
+            inStream = null;
+            doConnection();
+            return true;
+        }
+        private boolean redirectRetransmit() throws IOException {
+            // If we are not redirecting by policy, then we don't.
+            if (!getClient(outMessage).isAutoRedirect()) {
+                return false;
+            }
+            // We keep track of the redirections for redirect loop protection.
+            Set<String> visitedURLs = getSetVisitedURLs(outMessage);
+            visitedURLs.add(address.toString());
+            
+            String newURL = response.getFirstHeader("Location").getValue();
+            if (newURL != null) {
+                if (visitedURLs.contains(newURL)) {
+                    throw new IOException("Redirect loop detected on Conduit \"" 
+                                          + getConduitName() 
+                                          + "\" on '" 
+                                          + newURL
+                                          + "'");
+                }
+                inStream = null;
+                
+                address = new URL(newURL);
+                // We are going to redirect.
+                // Remove any Server Authentication Information for the previous
+                // URL.
+                Map<String, List<String>> headers = 
+                                    getSetProtocolHeaders(outMessage);
+                headers.remove("Authorization");
+                headers.remove("Proxy-Authorization");
+                
+                // If user configured this Conduit with preemptive authorization
+                // it is meant to make it to the end. (Too bad that information
+                // went to every URL along the way, but that's what the user 
+                // wants!
+                setHeadersByAuthorizationPolicy(outMessage, address, headers);
+                doConnection();
+                return true;
+            }
+            return false;
+        }
+        
+        public boolean isRepeatable() {
+            return false;
+        }
+
+        public boolean isChunked() {
+            return this.isChunking;
+        }
+
+        public long getContentLength() {
+            return contentLength;
+        }
+
+        public Header getContentType() {
+            return new BasicHeader(Message.CONTENT_TYPE, contentType);
+        }
+
+        public Header getContentEncoding() {
+            return null;
+        }
+
+        public InputStream getContent() throws IOException, IllegalStateException {
+            return inStream;
+        }
+
+        public boolean isStreaming() {
+            return true;
+        }
+
+        public void consumeContent() throws IOException {
+        }
+
+        public void produceContent(final ContentEncoder encoder, 
+                                   IOControl ioctrl) throws IOException {
+            if (writeTo(encoder)) {
+                encoder.complete();
+            }
+        }
+
+        public void finish() throws IOException {
+        }
+        
+        public synchronized void write(byte[] b, int off, int len) throws IOException {
+            if (!connected
+                && isChunking 
+                && ((outStream.size() + len) > this.chunkThreshold)) {
+                doConnection();
+            }
+            written = true;
+            if (isChunking) {
+                if (outStream.size() > 0 
+                    && ((outStream.size() + len) > (64 * 1024))) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                outStream.write(b, off, len);
+            }
+            if (requestStream != null) {
+                requestStream.write(b, off, len);
+            }
+        }
+        public synchronized void write(int b) throws IOException {
+            if (!connected
+                && isChunking 
+                && ((outStream.size() + 1) > this.chunkThreshold)) {
+                doConnection();
+            }
+            written = true;
+            if (isChunking) {
+                outStream.write(b);
+            }
+            if (requestStream != null) {
+                requestStream.write(b);
+            }
+        }
+        public synchronized void close() throws IOException {
+            closed = true;
+            if (!connected) {
+                doConnection();
+            }
+            if (isOneway(outMessage.getExchange())) {
+                //for a one way, we at least need to wait for the response code
+                while (responseCode == 0) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                if (responseCode != HttpURLConnection.HTTP_ACCEPTED
+                    && responseCode != HttpURLConnection.HTTP_OK) {
+                    throw new IOException(responseMessage);
+                }
+            }
+        }
+
+        public void consumeContent(final ContentDecoder decoder, IOControl ioctrl) throws IOException {
+            if (inStream != null) {
+                return;
+            }
+            inStream = new InputStream() {
+                public int read(byte b[], int off, int len) throws IOException {
+                    if (decoder.isCompleted()) {
+                        return -1;
+                    }
+                    ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+                    int i = decoder.read(buf);
+                    while (i == 0) {
+                        i = decoder.read(buf);
+                    }
+                    return i;
+                }
+
+                public int read() throws IOException {
+                    if (decoder.isCompleted()) {
+                        return -1;
+                    }
+                    byte[] bytes = new byte[1];
+                    int i = read(bytes, 0, 1);
+                    if (i == -1) {
+                        return -1;
+                    }
+                    i = bytes[0];
+                    i &= 0xFF;
+                    return i;
+                }
+                public void close() throws IOException {
+                }
+            };
+            this.handleResponseInternal();
+        }
+        public void writeTo(OutputStream outstream) throws IOException {
+            throw new UnsupportedOperationException("Not used for NIO");
+        }
+    }
+
+
+    protected void sendException(Message m, Exception exception) {
+        Message m2 = new MessageImpl();
+        m2.setExchange(m.getExchange());
+        m2.setContent(Exception.class, exception);
+        m.getExchange().put(Exception.class, exception);
+        this.incomingObserver.onMessage(m2);
+    }
+    
+}

Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java?rev=995917&view=auto
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (added)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Fri Sep 10 18:22:05 2010
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.http.async;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.async.AsyncHTTPConduit.WrappedOutputStream;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.nio.NHttpClientIOTarget;
+import org.apache.http.nio.NHttpConnection;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
+import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.SessionRequest;
+import org.apache.http.nio.reactor.SessionRequestCallback;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
+
+public class HttpClientController implements BusLifeCycleListener,
+    NHttpRequestExecutionHandler {
+    ConnectingIOReactor ioReactor;
+    
+    HttpClientController() {
+    }
+    
+    public void setUp() {
+        try {
+            HttpParams params = new BasicHttpParams();
+            params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 60000)
+                .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000)
+                .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
+                .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
+                .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
+                .setParameter(CoreProtocolPNames.USER_AGENT, Version.getCompleteVersionString());
+            ioReactor = new DefaultConnectingIOReactor(4, params);
+            
+            
+            BasicHttpProcessor httpproc = new BasicHttpProcessor();
+            httpproc.addInterceptor(new RequestContent());
+            httpproc.addInterceptor(new RequestTargetHost());
+            httpproc.addInterceptor(new RequestConnControl());
+            httpproc.addInterceptor(new RequestUserAgent());
+            httpproc.addInterceptor(new RequestExpectContinue());
+            
+            AsyncNHttpClientHandler handler = new AsyncNHttpClientHandler(
+                httpproc,
+                this,
+                new DefaultConnectionReuseStrategy(),
+                params) {
+                protected void handleTimeout(final NHttpConnection conn) {
+                    super.handleTimeout(conn);
+                    Message m = (Message)conn.getContext().getAttribute("MESSAGE");
+                    m.get(AsyncHTTPConduit.class).sendException(m, new SocketTimeoutException());
+                }
+            };
+
+            
+            final IOEventDispatch ioEventDispatch 
+                = new DefaultClientIOEventDispatch(handler, params) {
+                    protected NHttpClientIOTarget createConnection(IOSession session) {
+                        Message m = (Message)session.getAttribute(IOSession.ATTACHMENT_KEY);
+                        HTTPClientPolicy client = (HTTPClientPolicy)m
+                            .get(HTTPClientPolicy.class.getName() + ".complete");
+                        if (client != null) {
+                            session.setSocketTimeout((int)client.getReceiveTimeout());
+                        }
+                        return super.createConnection(session);
+                    }
+                };
+            
+            Thread t = new Thread(new Runnable() {
+                public void run() {
+                    try {
+                        ioReactor.execute(ioEventDispatch);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+            t.setDaemon(true);
+            t.start();
+
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+    public void execute(AsyncHTTPConduit conduit, 
+                        final URL address, 
+                        HttpUriRequest request, 
+                        final Message message) throws IOException {
+        int port = address.getPort();
+        if (port == -1) {
+            port = 80;
+        }
+        InetSocketAddress add = new InetSocketAddress(address.getHost(), port);
+        message.put(HttpUriRequest.class, request);
+        synchronized (message) {
+            SessionRequest req 
+                = ioReactor.connect(add, null, message,
+                    new SessionRequestCallback() {
+                        public void completed(SessionRequest request) {
+                            synchronized (message) {
+                                message.notifyAll();
+                            }
+                        }
+
+                        public void failed(SessionRequest request) {
+                            message.put(IOException.class, 
+                                        new ConnectException("Failed to connect to " + address));
+                            synchronized (message) {
+                                message.notifyAll();
+                            }
+                        }
+
+                        public void timeout(SessionRequest request) {
+                            message.put(IOException.class, 
+                                        new ConnectException("Failed to connect to " + address));
+                            synchronized (message) {
+                                message.notifyAll();
+                            }
+                        }
+
+                        public void cancelled(SessionRequest request) {
+                            message.put(IOException.class, 
+                                        new ConnectException("Failed to connect to " + address));
+                            synchronized (message) {
+                                message.notifyAll();
+                            }
+                        }
+                    });
+            HTTPClientPolicy client 
+                = (HTTPClientPolicy)message.get(HTTPClientPolicy.class.getName() + ".complete");
+            req.setConnectTimeout((int)client.getConnectionTimeout());
+            
+            try {
+                message.wait();
+                if (message.get(IOException.class) != null) {
+                    throw message.get(IOException.class);
+                }
+            } catch (InterruptedException e) {
+                //ignore
+            }
+            message.put(SessionRequest.class, req);
+        }
+    }
+    
+    public static HttpClientController getHttpClientController(Message message) {
+        Bus bus = message.getExchange() == null ? null : message.getExchange().getBus();
+        if (bus == null) {
+            bus = BusFactory.getThreadDefaultBus();
+        }
+        HttpClientController stuff = bus.getExtension(HttpClientController.class);
+        if (stuff == null) {
+            stuff = registerHttpClient(bus);
+        }
+        return stuff;
+    }
+    static synchronized HttpClientController registerHttpClient(Bus bus) {
+        HttpClientController stuff = bus.getExtension(HttpClientController.class);
+        if (stuff == null) {
+            stuff = new HttpClientController();
+            stuff.setUp();
+            bus.setExtension(stuff, HttpClientController.class);
+            bus.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(stuff);
+        }
+        return stuff;
+    }
+
+    public void initComplete() {
+    }
+    public void preShutdown() {
+    }
+    public void postShutdown() {
+    }
+
+    public void initalizeContext(HttpContext context, Object attachment) {
+        context.setAttribute("MESSAGE", attachment);
+    }
+
+    public HttpRequest submitRequest(HttpContext context) {
+        Message m = (Message)context.getAttribute("MESSAGE");
+        if (m == null) {
+            return null;
+        }
+        return m.get(HttpUriRequest.class);
+    }
+
+    public ConsumingNHttpEntity responseEntity(HttpResponse response, HttpContext context)
+        throws IOException {
+        Message m = (Message)context.getAttribute("MESSAGE");
+        WrappedOutputStream out = m.get(WrappedOutputStream.class);
+        out.setResponse(response);
+        return out;
+    }
+
+    public void handleResponse(HttpResponse response, HttpContext context) throws IOException {
+    }
+
+    public void finalizeContext(HttpContext context) {
+        context.removeAttribute("MESSAGE");
+    }
+    
+}
\ No newline at end of file

Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java (original)
+++ cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java Fri Sep 10 18:22:05 2010
@@ -379,8 +379,11 @@ public class ClientServerTest extends Ab
         long before = System.currentTimeMillis();
 
         long delay = 3000;
+        //System.out.println(System.currentTimeMillis());
         Response<GreetMeLaterResponse> r1 = greeter.greetMeLaterAsync(delay);
+        //System.out.println(System.currentTimeMillis());
         Response<GreetMeLaterResponse> r2 = greeter.greetMeLaterAsync(delay);
+        //System.out.println(System.currentTimeMillis());
 
         long after = System.currentTimeMillis();
 
@@ -403,6 +406,7 @@ public class ClientServerTest extends Ab
             }
             waited += 500;
         }
+        //Thread.sleep(100000000);
         assertTrue("Response is  not available.", r1.isDone());
         assertTrue("Response is  not available.", r2.isDone());
     }
@@ -680,7 +684,10 @@ public class ClientServerTest extends Ab
                 BindingProvider bp = (BindingProvider)greeter;
                 Map<String, Object> responseContext = bp.getResponseContext();
                 String contentType = (String) responseContext.get(Message.CONTENT_TYPE);
-                assertEquals("text/xml;charset=utf-8", contentType.toLowerCase());
+                
+                
+                assertTrue(contentType.toLowerCase().contains("text/xml"));
+                assertTrue(contentType.toLowerCase().contains("charset=utf-8"));
                 Integer responseCode = (Integer) responseContext.get(Message.RESPONSE_CODE);
                 assertEquals(500, responseCode.intValue());                
                 assertNotNull(brlf.getFaultInfo());

Modified: cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java (original)
+++ cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java Fri Sep 10 18:22:05 2010
@@ -127,6 +127,7 @@ public class ClientServerSwaTest extends
         String string = IOUtils.newStringFromBytes(b);
         assertEquals("testfoobar", string);
         assertEquals("Hi", textHolder.value);
+        bis.close();
     }
     
     @Test
@@ -157,6 +158,7 @@ public class ClientServerSwaTest extends
         assertEquals("testfoobar", string);
         assertEquals("Hi", textHolder.value);
         assertEquals("Header", headerHolder.value);
+        bis.close();
     }
     
     @Test
@@ -184,6 +186,7 @@ public class ClientServerSwaTest extends
         bis.read(b, 0, 10);
         String string = IOUtils.newStringFromBytes(b);
         assertEquals("testfoobar", string);
+        bis.close();
     }
     
     @Test
@@ -224,10 +227,14 @@ public class ClientServerSwaTest extends
                                                                  attach5);
         
         assertNotNull(response);
-        Map<?, ?> map = CastUtils.cast((Map<?, ?>)((BindingProvider)port).getResponseContext()
+        Map<String, DataHandler> map 
+            = CastUtils.cast((Map<?, ?>)((BindingProvider)port).getResponseContext()
                                            .get(MessageContext.INBOUND_MESSAGE_ATTACHMENTS));
         assertNotNull(map);
         assertEquals(5, map.size()); 
+        for (Map.Entry<String, DataHandler> ent : map.entrySet()) {
+            ent.getValue().getInputStream().close();
+        }
     }
     
     @Test

Modified: cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java (original)
+++ cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java Fri Sep 10 18:22:05 2010
@@ -31,6 +31,7 @@ import org.apache.cxf.factory_pattern.Nu
 import org.apache.cxf.factory_pattern.NumberFactory;
 import org.apache.cxf.factory_pattern.NumberFactoryService;
 import org.apache.cxf.factory_pattern.NumberService;
+import org.apache.cxf.frontend.ClientProxy;
 import org.apache.cxf.jaxws.ServiceImpl;
 import org.apache.cxf.jaxws.support.ServiceDelegateAccessor;
 import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -100,10 +101,14 @@ public class MultiplexClientServerTest e
            
         Number num =  (Number)serviceImpl.getPort(numberTwoRef, Number.class);
         assertTrue("20 is even", num.isEven().isEven());
+        ClientProxy.getClient(num).destroy();
         
         W3CEndpointReference numberTwentyThreeRef = factory.create("23");
         num =  (Number)serviceImpl.getPort(numberTwentyThreeRef, Number.class);
         assertTrue("23 is not even", !num.isEven().isEven());
+        ClientProxy.getClient(num).destroy();
+
+        ClientProxy.getClient(factory).destroy();
     }
     
     @Test
@@ -132,10 +137,14 @@ public class MultiplexClientServerTest e
             assertTrue("match on exception message " + expected.getMessage(),
                        expected.getMessage().indexOf("999") != -1);
         }
+        ClientProxy.getClient(num).destroy();
         
         ref = factory.create("37");
         assertNotNull("reference", ref);
         num =  (Number)serviceImpl.getPort(ref, Number.class);
         assertTrue("37 is not even", !num.isEven().isEven());
+        ClientProxy.getClient(num).destroy();
+
+        ClientProxy.getClient(factory).destroy();
     }
 }

Modified: cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java (original)
+++ cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java Fri Sep 10 18:22:05 2010
@@ -91,6 +91,8 @@ public class MtomServerTest extends Abst
         conduit.setMessageObserver(obs);
 
         Message m = new MessageImpl();
+        m.setExchange(new ExchangeImpl());
+        m.getExchange().put(Bus.class, getBus());
         String ct = "multipart/related; type=\"application/xop+xml\"; "
                     + "start=\"<so...@xfire.codehaus.org>\"; "
                     + "start-info=\"text/xml\"; "
@@ -157,6 +159,8 @@ public class MtomServerTest extends Abst
         conduit.setMessageObserver(obs);
 
         Message m = new MessageImpl();
+        m.setExchange(new ExchangeImpl());
+        m.getExchange().put(Bus.class, getBus());
         String ct = "multipart/related; type=\"application/xop+xml\"; "
                     + "start=\"<so...@xfire.codehaus.org>\"; "
                     + "start-info=\"text/xml; charset=utf-8\"; "
@@ -182,6 +186,8 @@ public class MtomServerTest extends Abst
 
         byte[] res = obs.getResponseStream().toByteArray();
         MessageImpl resMsg = new MessageImpl();
+        resMsg.setExchange(new ExchangeImpl());
+        resMsg.getExchange().put(Bus.class, getBus());
         resMsg.setContent(InputStream.class, new ByteArrayInputStream(res));
         resMsg.put(Message.CONTENT_TYPE, obs.getResponseContentType());
         resMsg.setExchange(new ExchangeImpl());